Add config option for remote job name (#6043)

* Track remote write queues via a map so we don't care about index.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Support a job name for remote write/read so we can differentiate between
them using the name.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Remote write/read has Name to not confuse the meaning of the field with
scrape job names.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Split queue/client label into remote_name and url labels.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Don't allow for duplicate remote write/read configs.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Ensure we restart remote write queues if the hash of their config has
not changed, but the remote name has changed.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Include name in remote read/write config hashes, simplify duplicates
check, update test accordingly.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2019-12-12 12:47:23 -08:00 committed by GitHub
parent cccd542891
commit 67838643ee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 483 additions and 123 deletions

View file

@ -265,15 +265,27 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
}
jobNames[scfg.JobName] = struct{}{}
}
rwNames := map[string]struct{}{}
for _, rwcfg := range c.RemoteWriteConfigs {
if rwcfg == nil {
return errors.New("empty or null remote write config section")
}
// Skip empty names, we fill their name with their config hash in remote write code.
if _, ok := rwNames[rwcfg.Name]; ok && rwcfg.Name != "" {
return errors.Errorf("found multiple remote write configs with job name %q", rwcfg.Name)
}
rwNames[rwcfg.Name] = struct{}{}
}
rrNames := map[string]struct{}{}
for _, rrcfg := range c.RemoteReadConfigs {
if rrcfg == nil {
return errors.New("empty or null remote read config section")
}
// Skip empty names, we fill their name with their config hash in remote read code.
if _, ok := rrNames[rrcfg.Name]; ok && rrcfg.Name != "" {
return errors.Errorf("found multiple remote read configs with job name %q", rrcfg.Name)
}
rrNames[rrcfg.Name] = struct{}{}
}
return nil
}
@ -596,6 +608,7 @@ type RemoteWriteConfig struct {
URL *config_util.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
Name string `yaml:"name,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
@ -654,6 +667,8 @@ type RemoteReadConfig struct {
URL *config_util.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
ReadRecent bool `yaml:"read_recent,omitempty"`
Name string `yaml:"name,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"`

View file

@ -73,6 +73,7 @@ var expectedConf = &Config{
{
URL: mustParseURL("http://remote1/push"),
RemoteTimeout: model.Duration(30 * time.Second),
Name: "drop_expensive",
WriteRelabelConfigs: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
@ -88,6 +89,7 @@ var expectedConf = &Config{
URL: mustParseURL("http://remote2/push"),
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: DefaultQueueConfig,
Name: "rw_tls",
HTTPClientConfig: config_util.HTTPClientConfig{
TLSConfig: config_util.TLSConfig{
CertFile: filepath.FromSlash("testdata/valid_cert_file"),
@ -102,11 +104,13 @@ var expectedConf = &Config{
URL: mustParseURL("http://remote1/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ReadRecent: true,
Name: "default",
},
{
URL: mustParseURL("http://remote3/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ReadRecent: false,
Name: "read_special",
RequiredMatchers: model.LabelSet{"job": "special"},
HTTPClientConfig: config_util.HTTPClientConfig{
TLSConfig: config_util.TLSConfig{
@ -825,6 +829,12 @@ var expectedErrors = []struct {
}, {
filename: "remote_write_url_missing.bad.yml",
errMsg: `url for remote_write is empty`,
}, {
filename: "remote_write_dup.bad.yml",
errMsg: `found multiple remote write configs with job name "queue1"`,
}, {
filename: "remote_read_dup.bad.yml",
errMsg: `found multiple remote read configs with job name "queue1"`,
},
{
filename: "ec2_filters_empty_values.bad.yml",

View file

@ -14,11 +14,13 @@ rule_files:
remote_write:
- url: http://remote1/push
name: drop_expensive
write_relabel_configs:
- source_labels: [__name__]
regex: expensive.*
action: drop
- url: http://remote2/push
name: rw_tls
tls_config:
cert_file: valid_cert_file
key_file: valid_key_file
@ -26,8 +28,10 @@ remote_write:
remote_read:
- url: http://remote1/read
read_recent: true
name: default
- url: http://remote3/read
read_recent: false
name: read_special
required_matchers:
job: special
tls_config:

View file

@ -0,0 +1,5 @@
remote_read:
- url: http://localhost:9090
name: queue1
- url: localhost:9091
name: queue1

View file

@ -0,0 +1,6 @@
remote_write:
- url: localhost:9090
name: queue1
- url: localhost:9091
name: queue1

View file

@ -40,7 +40,7 @@ var userAgent = fmt.Sprintf("Prometheus/%s", version.Version)
// Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct {
index int // Used to differentiate clients in metrics.
remoteName string // Used to differentiate clients in metrics.
url *config_util.URL
client *http.Client
timeout time.Duration
@ -54,14 +54,14 @@ type ClientConfig struct {
}
// NewClient creates a new Client.
func NewClient(index int, conf *ClientConfig) (*Client, error) {
func NewClient(remoteName string, conf *ClientConfig) (*Client, error) {
httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false)
if err != nil {
return nil, err
}
return &Client{
index: index,
remoteName: remoteName,
url: conf.URL,
client: httpClient,
timeout: time.Duration(conf.Timeout),
@ -115,9 +115,14 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
return err
}
// Name identifies the client.
// Name uniquely identifies the client.
func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url)
return c.remoteName
}
// Endpoint is the remote read or write endpoint.
func (c Client) Endpoint() string {
return c.url.String()
}
// Read reads from a remote endpoint.

View file

@ -62,18 +62,18 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
)
serverURL, err := url.Parse(server.URL)
if err != nil {
t.Fatal(err)
}
testutil.Ok(t, err)
c, err := NewClient(0, &ClientConfig{
conf := &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
})
if err != nil {
t.Fatal(err)
}
hash, err := toHash(conf)
testutil.Ok(t, err)
c, err := NewClient(hash, conf)
testutil.Ok(t, err)
err = c.Store(context.Background(), []byte{})
if !testutil.ErrorEqual(err, test.err) {
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)

View file

@ -36,12 +36,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wal"
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "remote_storage"
queue = "queue"
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
@ -59,7 +54,7 @@ var (
Name: "succeeded_samples_total",
Help: "Total number of samples successfully sent to remote storage.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
failedSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -68,7 +63,7 @@ var (
Name: "failed_samples_total",
Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
retriedSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -77,7 +72,7 @@ var (
Name: "retried_samples_total",
Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
droppedSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -86,7 +81,7 @@ var (
Name: "dropped_samples_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
enqueueRetriesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -95,7 +90,7 @@ var (
Name: "enqueue_retries_total",
Help: "Total number of times enqueue has failed because a shards queue was full.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
sentBatchDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
@ -105,7 +100,7 @@ var (
Help: "Duration of sample batch send calls to the remote storage.",
Buckets: prometheus.DefBuckets,
},
[]string{queue},
[]string{remoteName, endpoint},
)
queueHighestSentTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -114,7 +109,7 @@ var (
Name: "queue_highest_sent_timestamp_seconds",
Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
queuePendingSamples = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -123,7 +118,7 @@ var (
Name: "pending_samples",
Help: "The number of samples pending in the queues shards to be sent to the remote storage.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
shardCapacity = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -132,7 +127,7 @@ var (
Name: "shard_capacity",
Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
numShards = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -141,7 +136,7 @@ var (
Name: "shards",
Help: "The number of shards used for parallel sending to the remote storage.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
maxNumShards = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -150,7 +145,7 @@ var (
Name: "shards_max",
Help: "The maximum number of shards that the queue is allowed to run.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
minNumShards = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -159,7 +154,7 @@ var (
Name: "shards_min",
Help: "The minimum number of shards that the queue is allowed to run.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
desiredNumShards = promauto.NewGaugeVec(
prometheus.GaugeOpts{
@ -168,7 +163,7 @@ var (
Name: "shards_desired",
Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
bytesSent = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -177,7 +172,7 @@ var (
Name: "sent_bytes_total",
Help: "The total number of bytes sent by the queue.",
},
[]string{queue},
[]string{remoteName, endpoint},
)
)
@ -186,8 +181,10 @@ var (
type StorageClient interface {
// Store stores the given samples in the remote storage.
Store(context.Context, []byte) error
// Name identifies the remote storage implementation.
// Name uniquely identifies the remote storage.
Name() string
// Endpoint is the remote read or write endpoint for the storage client.
Endpoint() string
}
// QueueManager manages a queue of samples to be sent to the Storage
@ -242,8 +239,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string
logger = log.NewNopLogger()
}
name := client.Name()
logger = log.With(logger, "queue", name)
logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint())
t := &QueueManager{
logger: logger,
flushDeadline: flushDeadline,
@ -266,7 +262,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
}
t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, name, t, walDir)
t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir)
t.shards = t.newShards()
return t
@ -326,22 +322,23 @@ func (t *QueueManager) Start() {
// constructor because of the ordering of creating Queue Managers's, stopping them,
// and then starting new ones in storage/remote/storage.go ApplyConfig.
name := t.client.Name()
ep := t.client.Endpoint()
t.highestSentTimestampMetric = &maxGauge{
Gauge: queueHighestSentTimestamp.WithLabelValues(name),
Gauge: queueHighestSentTimestamp.WithLabelValues(name, ep),
}
t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name)
t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name)
t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name)
t.numShardsMetric = numShards.WithLabelValues(name)
t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name)
t.sentBatchDuration = sentBatchDuration.WithLabelValues(name)
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name)
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name)
t.shardCapacity = shardCapacity.WithLabelValues(name)
t.maxNumShards = maxNumShards.WithLabelValues(name)
t.minNumShards = minNumShards.WithLabelValues(name)
t.desiredNumShards = desiredNumShards.WithLabelValues(name)
t.bytesSent = bytesSent.WithLabelValues(name)
t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name, ep)
t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name, ep)
t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name, ep)
t.numShardsMetric = numShards.WithLabelValues(name, ep)
t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name, ep)
t.sentBatchDuration = sentBatchDuration.WithLabelValues(name, ep)
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name, ep)
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name, ep)
t.shardCapacity = shardCapacity.WithLabelValues(name, ep)
t.maxNumShards = maxNumShards.WithLabelValues(name, ep)
t.minNumShards = minNumShards.WithLabelValues(name, ep)
t.desiredNumShards = desiredNumShards.WithLabelValues(name, ep)
t.bytesSent = bytesSent.WithLabelValues(name, ep)
// Initialise some metrics.
t.shardCapacity.Set(float64(t.cfg.Capacity))
@ -380,19 +377,20 @@ func (t *QueueManager) Stop() {
t.seriesMtx.Unlock()
// Delete metrics so we don't have alerts for queues that are gone.
name := t.client.Name()
queueHighestSentTimestamp.DeleteLabelValues(name)
queuePendingSamples.DeleteLabelValues(name)
enqueueRetriesTotal.DeleteLabelValues(name)
droppedSamplesTotal.DeleteLabelValues(name)
numShards.DeleteLabelValues(name)
failedSamplesTotal.DeleteLabelValues(name)
sentBatchDuration.DeleteLabelValues(name)
succeededSamplesTotal.DeleteLabelValues(name)
retriedSamplesTotal.DeleteLabelValues(name)
shardCapacity.DeleteLabelValues(name)
maxNumShards.DeleteLabelValues(name)
minNumShards.DeleteLabelValues(name)
desiredNumShards.DeleteLabelValues(name)
ep := t.client.Endpoint()
queueHighestSentTimestamp.DeleteLabelValues(name, ep)
queuePendingSamples.DeleteLabelValues(name, ep)
enqueueRetriesTotal.DeleteLabelValues(name, ep)
droppedSamplesTotal.DeleteLabelValues(name, ep)
numShards.DeleteLabelValues(name, ep)
failedSamplesTotal.DeleteLabelValues(name, ep)
sentBatchDuration.DeleteLabelValues(name, ep)
succeededSamplesTotal.DeleteLabelValues(name, ep)
retriedSamplesTotal.DeleteLabelValues(name, ep)
shardCapacity.DeleteLabelValues(name, ep)
maxNumShards.DeleteLabelValues(name, ep)
minNumShards.DeleteLabelValues(name, ep)
desiredNumShards.DeleteLabelValues(name, ep)
}
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.

View file

@ -446,6 +446,10 @@ func (c *TestStorageClient) Name() string {
return "teststorageclient"
}
func (c *TestStorageClient) Endpoint() string {
return "http://test-remote.com/1234"
}
// TestBlockingStorageClient is a queue_manager StorageClient which will block
// on any calls to Store(), until the request's Context is cancelled, at which
// point the `numCalls` property will contain a count of how many times Store()
@ -472,6 +476,10 @@ func (c *TestBlockingStorageClient) Name() string {
return "testblockingstorageclient"
}
func (c *TestBlockingStorageClient) Endpoint() string {
return "http://test-remote-blocking.com/1234"
}
func BenchmarkSampleDelivery(b *testing.B) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.

View file

@ -29,7 +29,7 @@ var remoteReadQueries = prometheus.NewGaugeVec(
Name: "remote_read_queries",
Help: "The number of in-flight remote read queries.",
},
[]string{"client"},
[]string{remoteName, endpoint},
)
func init() {
@ -39,7 +39,7 @@ func init() {
// QueryableClient returns a storage.Queryable which queries the given
// Client to select series sets.
func QueryableClient(c *Client) storage.Queryable {
remoteReadQueries.WithLabelValues(c.Name())
remoteReadQueries.WithLabelValues(c.remoteName, c.url.String())
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querier{
ctx: ctx,
@ -65,7 +65,7 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (
return nil, nil, err
}
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name())
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.remoteName, q.client.url.String())
remoteReadGauge.Inc()
defer remoteReadGauge.Dec()

View file

@ -15,15 +15,98 @@ package remote
import (
"context"
"io/ioutil"
"net/url"
"os"
"reflect"
"sort"
"testing"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/testutil"
)
func TestNoDuplicateReadConfigs(t *testing.T) {
dir, err := ioutil.TempDir("", "TestNoDuplicateReadConfigs")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
cfg1 := config.RemoteReadConfig{
Name: "write-1",
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
}
cfg2 := config.RemoteReadConfig{
Name: "write-2",
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
}
cfg3 := config.RemoteReadConfig{
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
}
type testcase struct {
cfgs []*config.RemoteReadConfig
err bool
}
cases := []testcase{
{ // Duplicates but with different names, we should not get an error.
cfgs: []*config.RemoteReadConfig{
&cfg1,
&cfg2,
},
err: false,
},
{ // Duplicates but one with no name, we should not get an error.
cfgs: []*config.RemoteReadConfig{
&cfg1,
&cfg3,
},
err: false,
},
{ // Duplicates both with no name, we should get an error.
cfgs: []*config.RemoteReadConfig{
&cfg3,
&cfg3,
},
err: true,
},
}
for _, tc := range cases {
s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,
}
err := s.ApplyConfig(conf)
gotError := err != nil
testutil.Equals(t, tc.err, gotError)
err = s.Close()
testutil.Ok(t, err)
}
}
func TestExternalLabelsQuerierSelect(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "api-server"),

View file

@ -15,6 +15,10 @@ package remote
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"
@ -28,6 +32,14 @@ import (
"github.com/prometheus/prometheus/storage"
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "remote_storage"
remoteName = "remote_name"
endpoint = "url"
)
// startTimeCallback is a callback func that return the oldest timestamp stored in a storage.
type startTimeCallback func() (int64, error)
@ -67,9 +79,29 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
}
// Update read clients
readHashes := make(map[string]struct{})
queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs))
for i, rrConf := range conf.RemoteReadConfigs {
c, err := NewClient(i, &ClientConfig{
for _, rrConf := range conf.RemoteReadConfigs {
hash, err := toHash(rrConf)
if err != nil {
return nil
}
// Don't allow duplicate remote read configs.
if _, ok := readHashes[hash]; ok {
return fmt.Errorf("duplicate remote read configs are not allowed, found duplicate for URL: %s", rrConf.URL)
}
readHashes[hash] = struct{}{}
// Set the queue name to the config hash if the user has not set
// a name in their remote write config so we can still differentiate
// between queues that have the same remote write endpoint.
name := string(hash[:6])
if rrConf.Name != "" {
name = rrConf.Name
}
c, err := NewClient(name, &ClientConfig{
URL: rrConf.URL,
Timeout: rrConf.RemoteTimeout,
HTTPClientConfig: rrConf.HTTPClientConfig,
@ -139,3 +171,13 @@ func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher {
}
return ms
}
// Used for hashing configs and diff'ing hashes in ApplyConfig.
func toHash(data interface{}) (string, error) {
bytes, err := json.Marshal(data)
if err != nil {
return "", err
}
hash := md5.Sum(bytes)
return hex.EncodeToString(hash[:]), nil
}

View file

@ -15,10 +15,12 @@ package remote
import (
"io/ioutil"
"net/url"
"os"
"testing"
"github.com/prometheus/client_golang/prometheus"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/testutil"
)
@ -38,6 +40,18 @@ func TestStorageLifecycle(t *testing.T) {
&config.DefaultRemoteReadConfig,
},
}
// We need to set URL's so that metric creation doesn't panic.
conf.RemoteWriteConfigs[0].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
conf.RemoteReadConfigs[0].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
s.ApplyConfig(conf)
// make sure remote write has a queue.

View file

@ -14,8 +14,7 @@
package remote
import (
"crypto/md5"
"encoding/json"
"fmt"
"sync"
"time"
@ -50,11 +49,10 @@ type WriteStorage struct {
logger log.Logger
mtx sync.Mutex
configHash [16]byte
externalLabelHash [16]byte
configHash string
externalLabelHash string
walDir string
queues []*QueueManager
hashes [][16]byte
queues map[string]*QueueManager
samplesIn *ewmaRate
flushDeadline time.Duration
}
@ -65,6 +63,7 @@ func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Durati
logger = log.NewNopLogger()
}
rws := &WriteStorage{
queues: make(map[string]*QueueManager),
logger: logger,
flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
@ -88,20 +87,17 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.mtx.Lock()
defer rws.mtx.Unlock()
// Remote write queues only need to change if the remote write config or
// external labels change. Hash these together and only reload if the hash
// changes.
cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs)
configHash, err := toHash(conf.RemoteWriteConfigs)
if err != nil {
return err
}
externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels)
externalLabelHash, err := toHash(conf.GlobalConfig.ExternalLabels)
if err != nil {
return err
}
configHash := md5.Sum(cfgBytes)
externalLabelHash := md5.Sum(externalLabelBytes)
// Remote write queues only need to change if the remote write config or
// external labels change.
externalLabelUnchanged := externalLabelHash == rws.externalLabelHash
if configHash == rws.configHash && externalLabelUnchanged {
level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers")
@ -111,28 +107,39 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.configHash = configHash
rws.externalLabelHash = externalLabelHash
// Update write queues
newQueues := []*QueueManager{}
newHashes := [][16]byte{}
newClientIndexes := []int{}
for i, rwConf := range conf.RemoteWriteConfigs {
b, err := json.Marshal(rwConf)
newQueues := make(map[string]*QueueManager)
newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs {
hash, err := toHash(rwConf)
if err != nil {
return err
}
// Use RemoteWriteConfigs and its index to get hash. So if its index changed,
// the corresponding queue should also be restarted.
hash := md5.Sum(b)
if i < len(rws.queues) && rws.hashes[i] == hash && externalLabelUnchanged {
// The RemoteWriteConfig and index both not changed, keep the queue.
newQueues = append(newQueues, rws.queues[i])
newHashes = append(newHashes, hash)
rws.queues[i] = nil
// Set the queue name to the config hash if the user has not set
// a name in their remote write config so we can still differentiate
// between queues that have the same remote write endpoint.
name := string(hash[:6])
if rwConf.Name != "" {
name = rwConf.Name
}
// Don't allow duplicate remote write configs.
if _, ok := newQueues[hash]; ok {
return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL)
}
var nameUnchanged bool
queue, ok := rws.queues[hash]
if ok {
nameUnchanged = queue.client.Name() == name
}
if externalLabelUnchanged && nameUnchanged {
newQueues[hash] = queue
delete(rws.queues, hash)
continue
}
// Otherwise create a new queue.
c, err := NewClient(i, &ClientConfig{
c, err := NewClient(name, &ClientConfig{
URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig,
@ -140,7 +147,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
if err != nil {
return err
}
newQueues = append(newQueues, NewQueueManager(
newQueues[hash] = NewQueueManager(
prometheus.DefaultRegisterer,
rws.logger,
rws.walDir,
@ -150,24 +157,22 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.WriteRelabelConfigs,
c,
rws.flushDeadline,
))
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)
newClientIndexes = append(newClientIndexes, i)
}
// Anything remaining in rws.queues is a queue who's config has
// changed or was removed from the overall remote write config.
for _, q := range rws.queues {
// A nil queue means that queue has been reused.
if q != nil {
q.Stop()
}
}
for _, index := range newClientIndexes {
newQueues[index].Start()
for _, hash := range newHashes {
newQueues[hash].Start()
}
rws.queues = newQueues
rws.hashes = newHashes
return nil
}

View file

@ -15,16 +15,145 @@ package remote
import (
"io/ioutil"
"net/url"
"os"
"testing"
"time"
common_config "github.com/prometheus/common/config"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/util/testutil"
)
var cfg = config.RemoteWriteConfig{
Name: "dev",
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
QueueConfig: config.DefaultQueueConfig,
}
func TestNoDuplicateWriteConfigs(t *testing.T) {
dir, err := ioutil.TempDir("", "TestNoDuplicateWriteConfigs")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
cfg1 := config.RemoteWriteConfig{
Name: "write-1",
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
QueueConfig: config.DefaultQueueConfig,
}
cfg2 := config.RemoteWriteConfig{
Name: "write-2",
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
QueueConfig: config.DefaultQueueConfig,
}
cfg3 := config.RemoteWriteConfig{
URL: &config_util.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
QueueConfig: config.DefaultQueueConfig,
}
type testcase struct {
cfgs []*config.RemoteWriteConfig
err bool
}
cases := []testcase{
{ // Two duplicates, we should get an error.
cfgs: []*config.RemoteWriteConfig{
&cfg1,
&cfg1,
},
err: true,
},
{ // Duplicates but with different names, we should not get an error.
cfgs: []*config.RemoteWriteConfig{
&cfg1,
&cfg2,
},
err: false,
},
{ // Duplicates but one with no name, we should not get an error.
cfgs: []*config.RemoteWriteConfig{
&cfg1,
&cfg3,
},
err: false,
},
{ // Duplicates both with no name, we should get an error.
cfgs: []*config.RemoteWriteConfig{
&cfg3,
&cfg3,
},
err: true,
},
}
for _, tc := range cases {
s := NewWriteStorage(nil, dir, time.Millisecond)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
}
err := s.ApplyConfig(conf)
gotError := err != nil
testutil.Equals(t, tc.err, gotError)
err = s.Close()
testutil.Ok(t, err)
}
}
func TestRestartOnNameChange(t *testing.T) {
dir, err := ioutil.TempDir("", "TestRestartOnNameChange")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
hash, err := toHash(cfg)
testutil.Ok(t, err)
s := NewWriteStorage(nil, dir, time.Millisecond)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
&cfg,
},
}
testutil.Ok(t, s.ApplyConfig(conf))
testutil.Equals(t, s.queues[hash].client.Name(), cfg.Name)
// Change the queues name, ensure the queue has been restarted.
conf.RemoteWriteConfigs[0].Name = "dev-2"
testutil.Ok(t, s.ApplyConfig(conf))
hash, err = toHash(cfg)
testutil.Ok(t, err)
testutil.Equals(t, s.queues[hash].client.Name(), conf.RemoteWriteConfigs[0].Name)
err = s.Close()
testutil.Ok(t, err)
}
func TestWriteStorageLifecycle(t *testing.T) {
dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle")
testutil.Ok(t, err)
@ -49,23 +178,27 @@ func TestUpdateExternalLabels(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(dir)
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
s := NewWriteStorage(nil, dir, time.Second)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
&config.DefaultRemoteWriteConfig,
&cfg,
},
}
hash, err := toHash(conf.RemoteWriteConfigs[0])
testutil.Ok(t, err)
s.ApplyConfig(conf)
testutil.Equals(t, 1, len(s.queues))
testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels)
testutil.Equals(t, labels.Labels(nil), s.queues[hash].externalLabels)
conf.GlobalConfig.ExternalLabels = externalLabels
hash, err = toHash(conf.RemoteWriteConfigs[0])
testutil.Ok(t, err)
s.ApplyConfig(conf)
testutil.Equals(t, 1, len(s.queues))
testutil.Equals(t, externalLabels, s.queues[0].externalLabels)
testutil.Equals(t, externalLabels, s.queues[hash].externalLabels)
err = s.Close()
testutil.Ok(t, err)
@ -84,13 +217,22 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
&config.DefaultRemoteWriteConfig,
},
}
s.ApplyConfig(conf)
testutil.Equals(t, 1, len(s.queues))
queue := s.queues[0]
// We need to set URL's so that metric creation doesn't panic.
conf.RemoteWriteConfigs[0].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
hash, err := toHash(conf.RemoteWriteConfigs[0])
testutil.Ok(t, err)
s.ApplyConfig(conf)
testutil.Equals(t, 1, len(s.queues))
testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same")
s.ApplyConfig(conf)
testutil.Equals(t, 1, len(s.queues))
_, hashExists := s.queues[hash]
testutil.Assert(t, hashExists, "Queue pointer should have remained the same")
err = s.Close()
testutil.Ok(t, err)
@ -120,9 +262,30 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
// We need to set URL's so that metric creation doesn't panic.
conf.RemoteWriteConfigs[0].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
conf.RemoteWriteConfigs[1].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
conf.RemoteWriteConfigs[2].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
s.ApplyConfig(conf)
testutil.Equals(t, 3, len(s.queues))
q := s.queues[1]
c0Hash, err := toHash(c0)
testutil.Ok(t, err)
c1Hash, err := toHash(c1)
testutil.Ok(t, err)
// q := s.queues[1]
// Update c0 and c2.
c0.RemoteTimeout = model.Duration(40 * time.Second)
@ -134,7 +297,8 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
s.ApplyConfig(conf)
testutil.Equals(t, 3, len(s.queues))
testutil.Assert(t, q == s.queues[1], "Pointer of unchanged queue should have remained the same")
_, hashExists := s.queues[c1Hash]
testutil.Assert(t, hashExists, "Pointer of unchanged queue should have remained the same")
// Delete c0.
conf = &config.Config{
@ -144,7 +308,8 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
s.ApplyConfig(conf)
testutil.Equals(t, 2, len(s.queues))
testutil.Assert(t, q != s.queues[1], "If the index changed, the queue should be stopped and recreated.")
_, hashExists = s.queues[c0Hash]
testutil.Assert(t, !hashExists, "If the index changed, the queue should be stopped and recreated.")
err = s.Close()
testutil.Ok(t, err)