mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Review feedback: add yaml struct tags, don't embed queue config.
Also, rename QueueManageConfig to QueueConfig, for consistency with tags.
This commit is contained in:
parent
454b661145
commit
5169f990f9
|
@ -173,11 +173,11 @@ var (
|
||||||
// DefaultRemoteWriteConfig is the default remote write configuration.
|
// DefaultRemoteWriteConfig is the default remote write configuration.
|
||||||
DefaultRemoteWriteConfig = RemoteWriteConfig{
|
DefaultRemoteWriteConfig = RemoteWriteConfig{
|
||||||
RemoteTimeout: model.Duration(30 * time.Second),
|
RemoteTimeout: model.Duration(30 * time.Second),
|
||||||
QueueManagerConfig: DefaultQueueManagerConfig,
|
QueueConfig: DefaultQueueConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultQueueManagerConfig is the default remote queue configuration.
|
// DefaultQueueConfig is the default remote queue configuration.
|
||||||
DefaultQueueManagerConfig = QueueManagerConfig{
|
DefaultQueueConfig = QueueConfig{
|
||||||
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
||||||
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
||||||
MaxShards: 1000,
|
MaxShards: 1000,
|
||||||
|
@ -185,7 +185,7 @@ var (
|
||||||
|
|
||||||
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
||||||
// 1000 shards, this will buffer 100M samples total.
|
// 1000 shards, this will buffer 100M samples total.
|
||||||
QueueCapacity: 100 * 1000,
|
Capacity: 100 * 1000,
|
||||||
BatchSendDeadline: 5 * time.Second,
|
BatchSendDeadline: 5 * time.Second,
|
||||||
|
|
||||||
// Max number of times to retry a batch on recoverable errors.
|
// Max number of times to retry a batch on recoverable errors.
|
||||||
|
@ -1411,7 +1411,7 @@ type RemoteWriteConfig struct {
|
||||||
// We cannot do proper Go type embedding below as the parser will then parse
|
// We cannot do proper Go type embedding below as the parser will then parse
|
||||||
// values arbitrarily into the overflow maps of further-down types.
|
// values arbitrarily into the overflow maps of further-down types.
|
||||||
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
||||||
QueueManagerConfig QueueManagerConfig `yaml:",inline"`
|
QueueConfig QueueConfig `yaml:"queue_config,omitempty"`
|
||||||
|
|
||||||
// Catches all undefined fields and must be empty after parsing.
|
// Catches all undefined fields and must be empty after parsing.
|
||||||
XXX map[string]interface{} `yaml:",inline"`
|
XXX map[string]interface{} `yaml:",inline"`
|
||||||
|
@ -1438,27 +1438,27 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueManagerConfig is the configuration for the queue used to write to remote
|
// QueueConfig is the configuration for the queue used to write to remote
|
||||||
// storage.
|
// storage.
|
||||||
type QueueManagerConfig struct {
|
type QueueConfig struct {
|
||||||
// Number of samples to buffer per shard before we start dropping them.
|
// Number of samples to buffer per shard before we start dropping them.
|
||||||
QueueCapacity int
|
Capacity int `yaml:"capacity,omitempty"`
|
||||||
|
|
||||||
// Max number of shards, i.e. amount of concurrency.
|
// Max number of shards, i.e. amount of concurrency.
|
||||||
MaxShards int
|
MaxShards int `yaml:"max_shards,omitempty"`
|
||||||
|
|
||||||
// Maximum number of samples per send.
|
// Maximum number of samples per send.
|
||||||
MaxSamplesPerSend int
|
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
|
||||||
|
|
||||||
// Maximum time sample will wait in buffer.
|
// Maximum time sample will wait in buffer.
|
||||||
BatchSendDeadline time.Duration
|
BatchSendDeadline time.Duration `yaml:"batch_send_deadline,omitempty"`
|
||||||
|
|
||||||
// Max number of times to retry a batch on recoverable errors.
|
// Max number of times to retry a batch on recoverable errors.
|
||||||
MaxRetries int
|
MaxRetries int `yaml:"max_retries,omitempty"`
|
||||||
|
|
||||||
// On recoverable errors, backoff exponentially.
|
// On recoverable errors, backoff exponentially.
|
||||||
MinBackoff time.Duration
|
MinBackoff time.Duration `yaml:"min_backoff,omitempty"`
|
||||||
MaxBackoff time.Duration
|
MaxBackoff time.Duration `yaml:"max_backoff,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteReadConfig is the configuration for reading from remote storage.
|
// RemoteReadConfig is the configuration for reading from remote storage.
|
||||||
|
|
|
@ -66,12 +66,12 @@ var expectedConf = &Config{
|
||||||
Action: RelabelDrop,
|
Action: RelabelDrop,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
QueueManagerConfig: DefaultQueueManagerConfig,
|
QueueConfig: DefaultQueueConfig,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
URL: mustParseURL("http://remote2/push"),
|
URL: mustParseURL("http://remote2/push"),
|
||||||
RemoteTimeout: model.Duration(30 * time.Second),
|
RemoteTimeout: model.Duration(30 * time.Second),
|
||||||
QueueManagerConfig: DefaultQueueManagerConfig,
|
QueueConfig: DefaultQueueConfig,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ type StorageClient interface {
|
||||||
// QueueManager manages a queue of samples to be sent to the Storage
|
// QueueManager manages a queue of samples to be sent to the Storage
|
||||||
// indicated by the provided StorageClient.
|
// indicated by the provided StorageClient.
|
||||||
type QueueManager struct {
|
type QueueManager struct {
|
||||||
cfg config.QueueManagerConfig
|
cfg config.QueueConfig
|
||||||
externalLabels model.LabelSet
|
externalLabels model.LabelSet
|
||||||
relabelConfigs []*config.RelabelConfig
|
relabelConfigs []*config.RelabelConfig
|
||||||
client StorageClient
|
client StorageClient
|
||||||
|
@ -154,7 +154,7 @@ type QueueManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueManager builds a new QueueManager.
|
// NewQueueManager builds a new QueueManager.
|
||||||
func NewQueueManager(cfg config.QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
func NewQueueManager(cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
||||||
t := &QueueManager{
|
t := &QueueManager{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
externalLabels: externalLabels,
|
externalLabels: externalLabels,
|
||||||
|
@ -173,7 +173,7 @@ func NewQueueManager(cfg config.QueueManagerConfig, externalLabels model.LabelSe
|
||||||
}
|
}
|
||||||
t.shards = t.newShards(t.numShards)
|
t.shards = t.newShards(t.numShards)
|
||||||
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
|
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
|
||||||
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
|
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity))
|
||||||
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
@ -359,7 +359,7 @@ type shards struct {
|
||||||
func (t *QueueManager) newShards(numShards int) *shards {
|
func (t *QueueManager) newShards(numShards int) *shards {
|
||||||
queues := make([]chan *model.Sample, numShards)
|
queues := make([]chan *model.Sample, numShards)
|
||||||
for i := 0; i < numShards; i++ {
|
for i := 0; i < numShards; i++ {
|
||||||
queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity)
|
queues[i] = make(chan *model.Sample, t.cfg.Capacity)
|
||||||
}
|
}
|
||||||
s := &shards{
|
s := &shards{
|
||||||
qm: t,
|
qm: t,
|
||||||
|
|
|
@ -82,7 +82,7 @@ func (c *TestStorageClient) Name() string {
|
||||||
func TestSampleDelivery(t *testing.T) {
|
func TestSampleDelivery(t *testing.T) {
|
||||||
// Let's create an even number of send batches so we don't run into the
|
// Let's create an even number of send batches so we don't run into the
|
||||||
// batch timeout case.
|
// batch timeout case.
|
||||||
n := config.DefaultQueueManagerConfig.QueueCapacity * 2
|
n := config.DefaultQueueConfig.Capacity * 2
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -98,7 +98,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
c.expectSamples(samples[:len(samples)/2])
|
c.expectSamples(samples[:len(samples)/2])
|
||||||
|
|
||||||
cfg := config.DefaultQueueManagerConfig
|
cfg := config.DefaultQueueConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
m := NewQueueManager(cfg, nil, nil, c)
|
m := NewQueueManager(cfg, nil, nil, c)
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
|
|
||||||
func TestSampleDeliveryOrder(t *testing.T) {
|
func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
ts := 10
|
ts := 10
|
||||||
n := config.DefaultQueueManagerConfig.MaxSamplesPerSend * ts
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -134,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
|
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
c.expectSamples(samples)
|
c.expectSamples(samples)
|
||||||
m := NewQueueManager(config.DefaultQueueManagerConfig, nil, nil, c)
|
m := NewQueueManager(config.DefaultQueueConfig, nil, nil, c)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
|
@ -195,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||||
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
||||||
// should be left on the queue.
|
// should be left on the queue.
|
||||||
n := config.DefaultQueueManagerConfig.MaxSamplesPerSend * 2
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * 2
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -209,9 +209,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewTestBlockedStorageClient()
|
c := NewTestBlockedStorageClient()
|
||||||
cfg := config.DefaultQueueManagerConfig
|
cfg := config.DefaultQueueConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
cfg.QueueCapacity = n
|
cfg.Capacity = n
|
||||||
m := NewQueueManager(cfg, nil, nil, c)
|
m := NewQueueManager(cfg, nil, nil, c)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -241,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.queueLen() != config.DefaultQueueManagerConfig.MaxSamplesPerSend {
|
if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend {
|
||||||
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
||||||
m.queueLen(),
|
m.queueLen(),
|
||||||
)
|
)
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (w *Writer) ApplyConfig(conf *config.Config) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newQueues = append(newQueues, NewQueueManager(
|
newQueues = append(newQueues, NewQueueManager(
|
||||||
rwConf.QueueManagerConfig,
|
rwConf.QueueConfig,
|
||||||
conf.GlobalConfig.ExternalLabels,
|
conf.GlobalConfig.ExternalLabels,
|
||||||
rwConf.WriteRelabelConfigs,
|
rwConf.WriteRelabelConfigs,
|
||||||
c,
|
c,
|
||||||
|
|
Loading…
Reference in a new issue