mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Add HTTP Basic Auth & TLS support to the generic write path. (#1957)
* Add config, HTTP Basic Auth and TLS support to the generic write path. - Move generic write path configuration to the config file - Factor out config.TLSConfig -> tlf.Config translation - Support TLSConfig for generic remote storage - Rename Run to Start, and make it non-blocking. - Dedupe code in httputil for TLS config. - Make remote queue metrics global.
This commit is contained in:
parent
6dda28dbd4
commit
4520e12440
|
@ -211,15 +211,6 @@ func init() {
|
|||
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
|
||||
"The name of the database to use for storing samples in InfluxDB.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.URL, "experimental.storage.remote.url", "",
|
||||
"The URL of the remote endpoint to send samples to. None, if empty. EXPERIMENTAL.",
|
||||
)
|
||||
|
||||
cfg.fs.DurationVar(
|
||||
&cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second,
|
||||
"The timeout to use when sending samples to the remote storage.",
|
||||
)
|
||||
|
||||
// Alertmanager.
|
||||
cfg.fs.Var(
|
||||
|
|
|
@ -97,12 +97,15 @@ func Main() int {
|
|||
log.Errorf("Error initializing remote storage: %s", err)
|
||||
return 1
|
||||
}
|
||||
|
||||
if remoteStorage != nil {
|
||||
sampleAppender = append(sampleAppender, remoteStorage)
|
||||
reloadables = append(reloadables, remoteStorage)
|
||||
}
|
||||
|
||||
reloadableRemoteStorage := remote.NewConfigurable()
|
||||
sampleAppender = append(sampleAppender, reloadableRemoteStorage)
|
||||
reloadables = append(reloadables, reloadableRemoteStorage)
|
||||
|
||||
var (
|
||||
notifier = notifier.New(&cfg.notifier)
|
||||
targetManager = retrieval.NewTargetManager(sampleAppender)
|
||||
|
@ -185,11 +188,12 @@ func Main() int {
|
|||
}()
|
||||
|
||||
if remoteStorage != nil {
|
||||
prometheus.MustRegister(remoteStorage)
|
||||
|
||||
go remoteStorage.Run()
|
||||
remoteStorage.Start()
|
||||
defer remoteStorage.Stop()
|
||||
}
|
||||
|
||||
defer reloadableRemoteStorage.Stop()
|
||||
|
||||
// The storage has to be fully initialized before registering.
|
||||
if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok {
|
||||
prometheus.MustRegister(instrumentedStorage)
|
||||
|
|
|
@ -150,6 +150,11 @@ var (
|
|||
Port: 80,
|
||||
RefreshInterval: model.Duration(5 * time.Minute),
|
||||
}
|
||||
|
||||
// DefaultRemoteWriteConfig is the default remote write configuration.
|
||||
DefaultRemoteWriteConfig = RemoteWriteConfig{
|
||||
RemoteTimeout: model.Duration(30 * time.Second),
|
||||
}
|
||||
)
|
||||
|
||||
// URL is a custom URL type that allows validation at configuration load time.
|
||||
|
@ -187,6 +192,8 @@ type Config struct {
|
|||
RuleFiles []string `yaml:"rule_files,omitempty"`
|
||||
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
|
||||
|
||||
RemoteWriteConfig []RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
|
||||
|
@ -1065,3 +1072,28 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
|
|||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// RemoteWriteConfig is the configuration for remote storage.
|
||||
type RemoteWriteConfig struct {
|
||||
URL URL `yaml:"url,omitempty"`
|
||||
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
||||
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
||||
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
||||
ProxyURL URL `yaml:"proxy_url,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
*c = DefaultRemoteWriteConfig
|
||||
type plain RemoteWriteConfig
|
||||
if err := unmarshal((*plain)(c)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkOverflow(c.XXX, "remote_write"); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -226,28 +226,21 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup {
|
|||
|
||||
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {
|
||||
bearerTokenFile := conf.BearerTokenFile
|
||||
caFile := conf.TLSConfig.CAFile
|
||||
tlsConfig := conf.TLSConfig
|
||||
if conf.InCluster {
|
||||
if len(bearerTokenFile) == 0 {
|
||||
bearerTokenFile = serviceAccountToken
|
||||
}
|
||||
if len(caFile) == 0 {
|
||||
if len(tlsConfig.CAFile) == 0 {
|
||||
// With recent versions, the CA certificate is mounted as a secret
|
||||
// but we need to handle older versions too. In this case, don't
|
||||
// set the CAFile & the configuration will have to use InsecureSkipVerify.
|
||||
if _, err := os.Stat(serviceAccountCACert); err == nil {
|
||||
caFile = serviceAccountCACert
|
||||
tlsConfig.CAFile = serviceAccountCACert
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tlsOpts := httputil.TLSOptions{
|
||||
InsecureSkipVerify: conf.TLSConfig.InsecureSkipVerify,
|
||||
CAFile: caFile,
|
||||
CertFile: conf.TLSConfig.CertFile,
|
||||
KeyFile: conf.TLSConfig.KeyFile,
|
||||
}
|
||||
tlsConfig, err := httputil.NewTLSConfig(tlsOpts)
|
||||
tls, err := httputil.NewTLSConfig(tlsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -257,7 +250,7 @@ func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, err
|
|||
c, err = net.DialTimeout(netw, addr, time.Duration(conf.RequestTimeout))
|
||||
return
|
||||
},
|
||||
TLSClientConfig: tlsConfig,
|
||||
TLSClientConfig: tls,
|
||||
}
|
||||
|
||||
// If a bearer token is provided, create a round tripper that will set the
|
||||
|
|
|
@ -68,18 +68,7 @@ func NewTarget(labels, metaLabels model.LabelSet, params url.Values) *Target {
|
|||
|
||||
// NewHTTPClient returns a new HTTP client configured for the given scrape configuration.
|
||||
func NewHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
|
||||
tlsOpts := httputil.TLSOptions{
|
||||
InsecureSkipVerify: cfg.TLSConfig.InsecureSkipVerify,
|
||||
CAFile: cfg.TLSConfig.CAFile,
|
||||
}
|
||||
if len(cfg.TLSConfig.CertFile) > 0 && len(cfg.TLSConfig.KeyFile) > 0 {
|
||||
tlsOpts.CertFile = cfg.TLSConfig.CertFile
|
||||
tlsOpts.KeyFile = cfg.TLSConfig.KeyFile
|
||||
}
|
||||
if len(cfg.TLSConfig.ServerName) > 0 {
|
||||
tlsOpts.ServerName = cfg.TLSConfig.ServerName
|
||||
}
|
||||
tlsConfig, err := httputil.NewTLSConfig(tlsOpts)
|
||||
tlsConfig, err := httputil.NewTLSConfig(cfg.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -21,22 +21,45 @@ import (
|
|||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
)
|
||||
|
||||
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
|
||||
type Client struct {
|
||||
url string
|
||||
client http.Client
|
||||
index int // Used to differentiate metrics
|
||||
url config.URL
|
||||
client *http.Client
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(url string, timeout time.Duration) (*Client, error) {
|
||||
func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
|
||||
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The only timeout we care about is the configured push timeout.
|
||||
// It is applied on request. So we leave out any timings here.
|
||||
var rt http.RoundTripper = &http.Transport{
|
||||
Proxy: http.ProxyURL(conf.ProxyURL.URL),
|
||||
TLSClientConfig: tlsConfig,
|
||||
}
|
||||
|
||||
if conf.BasicAuth != nil {
|
||||
rt = httputil.NewBasicAuthRoundTripper(conf.BasicAuth.Username, conf.BasicAuth.Password, rt)
|
||||
}
|
||||
|
||||
return &Client{
|
||||
url: url,
|
||||
client: http.Client{
|
||||
Timeout: timeout,
|
||||
},
|
||||
index: index,
|
||||
url: conf.URL,
|
||||
client: httputil.NewClient(rt),
|
||||
timeout: time.Duration(conf.RemoteTimeout),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -75,12 +98,14 @@ func (c *Client) Store(samples model.Samples) error {
|
|||
return err
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequest("POST", c.url, &buf)
|
||||
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||
httpResp, err := c.client.Do(httpReq)
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), c.timeout)
|
||||
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -97,5 +122,5 @@ func (c *Client) Store(samples model.Samples) error {
|
|||
// will simply be removed in the restructuring and the whole "generic" naming
|
||||
// will be gone for good.
|
||||
func (c Client) Name() string {
|
||||
return "generic"
|
||||
return fmt.Sprintf("generic:%d:%s", c.index, c.url)
|
||||
}
|
||||
|
|
|
@ -26,13 +26,76 @@ import (
|
|||
const (
|
||||
namespace = "prometheus"
|
||||
subsystem = "remote_storage"
|
||||
|
||||
result = "result"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
dropped = "dropped"
|
||||
queue = "queue"
|
||||
)
|
||||
|
||||
var (
|
||||
sentSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sent_samples_total",
|
||||
Help: "Total number of processed samples sent to remote storage.",
|
||||
},
|
||||
[]string{queue},
|
||||
)
|
||||
failedSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "failed_samples_total",
|
||||
Help: "Total number of processed samples which failed on send to remote storage.",
|
||||
},
|
||||
[]string{queue},
|
||||
)
|
||||
droppedSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "dropped_samples_total",
|
||||
Help: "Total number of samples which were dropped due to the queue being full.",
|
||||
},
|
||||
[]string{queue},
|
||||
)
|
||||
sentBatchDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sent_batch_duration_seconds",
|
||||
Help: "Duration of sample batch send calls to the remote storage.",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
},
|
||||
[]string{queue},
|
||||
)
|
||||
queueLength = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queue_length",
|
||||
Help: "The number of processed samples queued to be sent to the remote storage.",
|
||||
},
|
||||
[]string{queue},
|
||||
)
|
||||
queueCapacity = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queue_capacity",
|
||||
Help: "The capacity of the queue of samples to be sent to the remote storage.",
|
||||
},
|
||||
[]string{queue},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(sentSamplesTotal)
|
||||
prometheus.MustRegister(failedSamplesTotal)
|
||||
prometheus.MustRegister(droppedSamplesTotal)
|
||||
prometheus.MustRegister(sentBatchDuration)
|
||||
prometheus.MustRegister(queueLength)
|
||||
prometheus.MustRegister(queueCapacity)
|
||||
}
|
||||
|
||||
// StorageClient defines an interface for sending a batch of samples to an
|
||||
// external timeseries database.
|
||||
type StorageClient interface {
|
||||
|
@ -59,24 +122,16 @@ var defaultConfig = StorageQueueManagerConfig{
|
|||
// StorageQueueManager manages a queue of samples to be sent to the Storage
|
||||
// indicated by the provided StorageClient.
|
||||
type StorageQueueManager struct {
|
||||
cfg StorageQueueManagerConfig
|
||||
tsdb StorageClient
|
||||
shards []chan *model.Sample
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
sentSamplesTotal *prometheus.CounterVec
|
||||
sentBatchDuration *prometheus.HistogramVec
|
||||
queueLength prometheus.Gauge
|
||||
queueCapacity prometheus.Metric
|
||||
cfg StorageQueueManagerConfig
|
||||
tsdb StorageClient
|
||||
shards []chan *model.Sample
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
queueName string
|
||||
}
|
||||
|
||||
// NewStorageQueueManager builds a new StorageQueueManager.
|
||||
func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager {
|
||||
constLabels := prometheus.Labels{
|
||||
"type": tsdb.Name(),
|
||||
}
|
||||
|
||||
if cfg == nil {
|
||||
cfg = &defaultConfig
|
||||
}
|
||||
|
@ -87,51 +142,14 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig)
|
|||
}
|
||||
|
||||
t := &StorageQueueManager{
|
||||
cfg: *cfg,
|
||||
tsdb: tsdb,
|
||||
shards: shards,
|
||||
done: make(chan struct{}),
|
||||
|
||||
sentSamplesTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sent_samples_total",
|
||||
Help: "Total number of processed samples sent to remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
sentBatchDuration: prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sent_batch_duration_seconds",
|
||||
Help: "Duration of sample batch send calls to the remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
Buckets: prometheus.DefBuckets,
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queue_length",
|
||||
Help: "The number of processed samples queued to be sent to the remote storage.",
|
||||
ConstLabels: constLabels,
|
||||
}),
|
||||
queueCapacity: prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"),
|
||||
"The capacity of the queue of samples to be sent to the remote storage.",
|
||||
nil,
|
||||
constLabels,
|
||||
),
|
||||
prometheus.GaugeValue,
|
||||
float64(cfg.QueueCapacity*cfg.Shards),
|
||||
),
|
||||
cfg: *cfg,
|
||||
tsdb: tsdb,
|
||||
shards: shards,
|
||||
done: make(chan struct{}),
|
||||
queueName: tsdb.Name(),
|
||||
}
|
||||
|
||||
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
|
||||
t.wg.Add(cfg.Shards)
|
||||
return t
|
||||
}
|
||||
|
@ -145,8 +163,9 @@ func (t *StorageQueueManager) Append(s *model.Sample) error {
|
|||
|
||||
select {
|
||||
case t.shards[shard] <- s:
|
||||
queueLength.WithLabelValues(t.queueName).Inc()
|
||||
default:
|
||||
t.sentSamplesTotal.WithLabelValues(dropped).Inc()
|
||||
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
|
||||
log.Warn("Remote storage queue full, discarding sample.")
|
||||
}
|
||||
return nil
|
||||
|
@ -159,38 +178,12 @@ func (*StorageQueueManager) NeedsThrottling() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
|
||||
t.sentSamplesTotal.Describe(ch)
|
||||
t.sentBatchDuration.Describe(ch)
|
||||
ch <- t.queueLength.Desc()
|
||||
ch <- t.queueCapacity.Desc()
|
||||
}
|
||||
|
||||
// QueueLength returns the number of outstanding samples in the queue.
|
||||
func (t *StorageQueueManager) queueLen() int {
|
||||
queueLength := 0
|
||||
for _, shard := range t.shards {
|
||||
queueLength += len(shard)
|
||||
}
|
||||
return queueLength
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
|
||||
t.sentSamplesTotal.Collect(ch)
|
||||
t.sentBatchDuration.Collect(ch)
|
||||
t.queueLength.Set(float64(t.queueLen()))
|
||||
ch <- t.queueLength
|
||||
ch <- t.queueCapacity
|
||||
}
|
||||
|
||||
// Run continuously sends samples to the remote storage.
|
||||
func (t *StorageQueueManager) Run() {
|
||||
// Start the queue manager sending samples to the remote storage.
|
||||
// Does not block.
|
||||
func (t *StorageQueueManager) Start() {
|
||||
for i := 0; i < t.cfg.Shards; i++ {
|
||||
go t.runShard(i)
|
||||
}
|
||||
t.wg.Wait()
|
||||
}
|
||||
|
||||
// Stop stops sending samples to the remote storage and waits for pending
|
||||
|
@ -225,6 +218,7 @@ func (t *StorageQueueManager) runShard(i int) {
|
|||
return
|
||||
}
|
||||
|
||||
queueLength.WithLabelValues(t.queueName).Dec()
|
||||
pendingSamples = append(pendingSamples, s)
|
||||
|
||||
for len(pendingSamples) >= t.cfg.MaxSamplesPerSend {
|
||||
|
@ -248,11 +242,11 @@ func (t *StorageQueueManager) sendSamples(s model.Samples) {
|
|||
err := t.tsdb.Store(s)
|
||||
duration := time.Since(begin).Seconds()
|
||||
|
||||
labelValue := success
|
||||
if err != nil {
|
||||
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
|
||||
labelValue = failure
|
||||
failedSamplesTotal.WithLabelValues(t.queueName).Add(float64(len(s)))
|
||||
} else {
|
||||
sentSamplesTotal.WithLabelValues(t.queueName).Add(float64(len(s)))
|
||||
}
|
||||
t.sentSamplesTotal.WithLabelValues(labelValue).Add(float64(len(s)))
|
||||
t.sentBatchDuration.WithLabelValues(labelValue).Observe(duration)
|
||||
sentBatchDuration.WithLabelValues(t.queueName).Observe(duration)
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ func TestSampleDelivery(t *testing.T) {
|
|||
for _, s := range samples[len(samples)/2:] {
|
||||
m.Append(s)
|
||||
}
|
||||
go m.Run()
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
c.waitForExpectedSamples(t)
|
||||
|
@ -141,7 +141,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
|||
for _, s := range samples {
|
||||
m.Append(s)
|
||||
}
|
||||
go m.Run()
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
c.waitForExpectedSamples(t)
|
||||
|
@ -181,6 +181,14 @@ func (c *TestBlockingStorageClient) Name() string {
|
|||
return "testblockingstorageclient"
|
||||
}
|
||||
|
||||
func (t *StorageQueueManager) queueLen() int {
|
||||
queueLength := 0
|
||||
for _, shard := range t.shards {
|
||||
queueLength += len(shard)
|
||||
}
|
||||
return queueLength
|
||||
}
|
||||
|
||||
func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||
// Our goal is to fully empty the queue:
|
||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||
|
@ -204,7 +212,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
|||
c := NewTestBlockedStorageClient()
|
||||
m := NewStorageQueueManager(c, &cfg)
|
||||
|
||||
go m.Run()
|
||||
m.Start()
|
||||
|
||||
defer func() {
|
||||
c.unlock()
|
||||
|
|
|
@ -69,13 +69,6 @@ func New(o *Options) (*Storage, error) {
|
|||
prometheus.MustRegister(c)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.URL != "" {
|
||||
c, err := NewClient(o.URL, o.StorageTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if len(s.queues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -94,15 +87,12 @@ type Options struct {
|
|||
GraphiteAddress string
|
||||
GraphiteTransport string
|
||||
GraphitePrefix string
|
||||
// TODO: This just being called "URL" will make more sense once the
|
||||
// other remote storage mechanisms are removed.
|
||||
URL string
|
||||
}
|
||||
|
||||
// Run starts the background processing of the storage queues.
|
||||
func (s *Storage) Run() {
|
||||
func (s *Storage) Start() {
|
||||
for _, q := range s.queues {
|
||||
go q.Run()
|
||||
q.Start()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,17 +130,3 @@ func (s *Storage) Append(smpl *model.Sample) error {
|
|||
func (s *Storage) NeedsThrottling() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (s *Storage) Describe(ch chan<- *prometheus.Desc) {
|
||||
for _, q := range s.queues {
|
||||
q.Describe(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (s *Storage) Collect(ch chan<- prometheus.Metric) {
|
||||
for _, q := range s.queues {
|
||||
q.Collect(ch)
|
||||
}
|
||||
}
|
||||
|
|
99
storage/remote/remote_reloadable.go
Normal file
99
storage/remote/remote_reloadable.go
Normal file
|
@ -0,0 +1,99 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
// Storage collects multiple remote storage queues.
|
||||
type ReloadableStorage struct {
|
||||
mtx sync.RWMutex
|
||||
externalLabels model.LabelSet
|
||||
conf []config.RemoteWriteConfig
|
||||
|
||||
queues []*StorageQueueManager
|
||||
}
|
||||
|
||||
// New returns a new remote Storage.
|
||||
func NewConfigurable() *ReloadableStorage {
|
||||
return &ReloadableStorage{}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
newQueues := []*StorageQueueManager{}
|
||||
for i, c := range conf.RemoteWriteConfig {
|
||||
c, err := NewClient(i, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueues = append(newQueues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
}
|
||||
s.queues = newQueues
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
s.conf = conf.RemoteWriteConfig
|
||||
for _, q := range s.queues {
|
||||
q.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *ReloadableStorage) Stop() {
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Append implements storage.SampleAppender. Always returns nil.
|
||||
func (s *ReloadableStorage) Append(smpl *model.Sample) error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
var snew model.Sample
|
||||
snew = *smpl
|
||||
snew.Metric = smpl.Metric.Clone()
|
||||
|
||||
for ln, lv := range s.externalLabels {
|
||||
if _, ok := smpl.Metric[ln]; !ok {
|
||||
snew.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Append(&snew)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NeedsThrottling implements storage.SampleAppender. It will always return
|
||||
// false as a remote storage drops samples on the floor if backlogging instead
|
||||
// of asking for throttling.
|
||||
func (s *ReloadableStorage) NeedsThrottling() bool {
|
||||
return false
|
||||
}
|
|
@ -22,6 +22,8 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
// NewClient returns a http.Client using the specified http.RoundTripper.
|
||||
|
@ -117,38 +119,30 @@ func cloneRequest(r *http.Request) *http.Request {
|
|||
return r2
|
||||
}
|
||||
|
||||
type TLSOptions struct {
|
||||
InsecureSkipVerify bool
|
||||
CAFile string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
ServerName string
|
||||
}
|
||||
|
||||
func NewTLSConfig(opts TLSOptions) (*tls.Config, error) {
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: opts.InsecureSkipVerify}
|
||||
func NewTLSConfig(cfg config.TLSConfig) (*tls.Config, error) {
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify}
|
||||
|
||||
// If a CA cert is provided then let's read it in so we can validate the
|
||||
// scrape target's certificate properly.
|
||||
if len(opts.CAFile) > 0 {
|
||||
if len(cfg.CAFile) > 0 {
|
||||
caCertPool := x509.NewCertPool()
|
||||
// Load CA cert.
|
||||
caCert, err := ioutil.ReadFile(opts.CAFile)
|
||||
caCert, err := ioutil.ReadFile(cfg.CAFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to use specified CA cert %s: %s", opts.CAFile, err)
|
||||
return nil, fmt.Errorf("unable to use specified CA cert %s: %s", cfg.CAFile, err)
|
||||
}
|
||||
caCertPool.AppendCertsFromPEM(caCert)
|
||||
tlsConfig.RootCAs = caCertPool
|
||||
}
|
||||
|
||||
if len(opts.ServerName) > 0 {
|
||||
tlsConfig.ServerName = opts.ServerName
|
||||
if len(cfg.ServerName) > 0 {
|
||||
tlsConfig.ServerName = cfg.ServerName
|
||||
}
|
||||
// If a client cert & key is provided then configure TLS config accordingly.
|
||||
if len(opts.CertFile) > 0 && len(opts.KeyFile) > 0 {
|
||||
cert, err := tls.LoadX509KeyPair(opts.CertFile, opts.KeyFile)
|
||||
if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
|
||||
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", opts.CertFile, opts.KeyFile, err)
|
||||
return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", cfg.CertFile, cfg.KeyFile, err)
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue