From 593e565688daeca559edcbe276e46c70ac578605 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 2 Apr 2015 20:20:00 +0200 Subject: [PATCH] Allow writing to InfluxDB/OpenTSDB at the same time. --- main.go | 56 ++++++++++++++-------------- storage/remote/influxdb/client.go | 5 +++ storage/remote/opentsdb/client.go | 5 +++ storage/remote/queue_manager.go | 46 ++++++++++++++--------- storage/remote/queue_manager_test.go | 4 ++ storage/storage.go | 19 +++++----- 6 files changed, 80 insertions(+), 55 deletions(-) diff --git a/main.go b/main.go index d84baf66f..c8413f3cf 100644 --- a/main.go +++ b/main.go @@ -51,9 +51,8 @@ var ( persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") - remoteStorageType = flag.String("storage.remote.type", "", "The type of remote storage to use. Valid values: 'opentsdb', 'influxdb'. If this flag is left empty, no remote storage is used.") - opentsdbURL = flag.String("storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to.") - influxdbURL = flag.String("storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to.") + opentsdbURL = flag.String("storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to. None, if empty.") + influxdbURL = flag.String("storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.") remoteStorageTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") @@ -76,7 +75,7 @@ type prometheus struct { targetManager retrieval.TargetManager notificationHandler *notification.NotificationHandler storage local.Storage - remoteStorageQueue *remote.StorageQueueManager + remoteStorageQueues []*remote.StorageQueueManager webService *web.WebService @@ -122,26 +121,27 @@ func NewPrometheus() *prometheus { } var sampleAppender storage.SampleAppender - var remoteStorageQueue *remote.StorageQueueManager - if *remoteStorageType == "" { - glog.Warningf("No remote storage implementation selected; not sending any samples to long-term storage") + var remoteStorageQueues []*remote.StorageQueueManager + if *opentsdbURL == "" && *influxdbURL == "" { + glog.Warningf("No remote storage URLs provided; not sending any samples to long-term storage") sampleAppender = memStorage } else { - var c remote.StorageClient - switch *remoteStorageType { - case "opentsdb": - c = opentsdb.NewClient(*opentsdbURL, *remoteStorageTimeout) - case "influxdb": - c = influxdb.NewClient(*influxdbURL, *remoteStorageTimeout) - default: - glog.Fatalf("Invalid flag value for 'storage.remote.type': %s", *remoteStorageType) + fanout := storage.Fanout{memStorage} + + addRemoteStorage := func(c remote.StorageClient) { + qm := remote.NewStorageQueueManager(c, 100*1024) + fanout = append(fanout, qm) + remoteStorageQueues = append(remoteStorageQueues, qm) } - remoteStorageQueue = remote.NewStorageQueueManager(c, 100*1024) - sampleAppender = storage.Tee{ - Appender1: remoteStorageQueue, - Appender2: memStorage, + if *opentsdbURL != "" { + addRemoteStorage(opentsdb.NewClient(*opentsdbURL, *remoteStorageTimeout)) } + if *influxdbURL != "" { + addRemoteStorage(influxdb.NewClient(*influxdbURL, *remoteStorageTimeout)) + } + + sampleAppender = fanout } targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels()) @@ -196,7 +196,7 @@ func NewPrometheus() *prometheus { targetManager: targetManager, notificationHandler: notificationHandler, storage: memStorage, - remoteStorageQueue: remoteStorageQueue, + remoteStorageQueues: remoteStorageQueues, webService: webService, } @@ -208,8 +208,8 @@ func NewPrometheus() *prometheus { // down. The method installs an interrupt handler, allowing to trigger a // shutdown by sending SIGTERM to the process. func (p *prometheus) Serve() { - if p.remoteStorageQueue != nil { - go p.remoteStorageQueue.Run() + for _, q := range p.remoteStorageQueues { + go q.Run() } go p.ruleManager.Run() go p.notificationHandler.Run() @@ -239,8 +239,8 @@ func (p *prometheus) Serve() { glog.Error("Error stopping local storage: ", err) } - if p.remoteStorageQueue != nil { - p.remoteStorageQueue.Stop() + for _, q := range p.remoteStorageQueues { + q.Stop() } p.notificationHandler.Stop() @@ -251,8 +251,8 @@ func (p *prometheus) Serve() { func (p *prometheus) Describe(ch chan<- *registry.Desc) { p.notificationHandler.Describe(ch) p.storage.Describe(ch) - if p.remoteStorageQueue != nil { - p.remoteStorageQueue.Describe(ch) + for _, q := range p.remoteStorageQueues { + q.Describe(ch) } } @@ -260,8 +260,8 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) { func (p *prometheus) Collect(ch chan<- registry.Metric) { p.notificationHandler.Collect(ch) p.storage.Collect(ch) - if p.remoteStorageQueue != nil { - p.remoteStorageQueue.Collect(ch) + for _, q := range p.remoteStorageQueues { + q.Collect(ch) } } diff --git a/storage/remote/influxdb/client.go b/storage/remote/influxdb/client.go index 434169bfd..ac2acad33 100644 --- a/storage/remote/influxdb/client.go +++ b/storage/remote/influxdb/client.go @@ -157,3 +157,8 @@ func (c *Client) Store(samples clientmodel.Samples) error { } return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"]) } + +// Name identifies the client as an InfluxDB client. +func (c Client) Name() string { + return "influxdb" +} diff --git a/storage/remote/opentsdb/client.go b/storage/remote/opentsdb/client.go index 9bf12f4dc..84e50977b 100644 --- a/storage/remote/opentsdb/client.go +++ b/storage/remote/opentsdb/client.go @@ -134,3 +134,8 @@ func (c *Client) Store(samples clientmodel.Samples) error { } return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) } + +// Name identifies the client as an OpenTSDB client. +func (c Client) Name() string { + return "opentsdb" +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 5378c9cb2..35f6105cf 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -46,7 +46,10 @@ const ( // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { + // Store stores the given samples in the remote storage. Store(clientmodel.Samples) error + // Name identifies the remote storage implementation. + Name() string } // StorageQueueManager manages a queue of samples to be sent to the Storage @@ -67,6 +70,10 @@ type StorageQueueManager struct { // NewStorageQueueManager builds a new StorageQueueManager. func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager { + constLabels := prometheus.Labels{ + "type": tsdb.Name(), + } + return &StorageQueueManager{ tsdb: tsdb, queue: make(chan *clientmodel.Sample, queueCapacity), @@ -75,36 +82,41 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue samplesCount: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_samples_total", - Help: "Total number of processed samples to be sent to remote storage.", + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_samples_total", + Help: "Total number of processed samples to be sent to remote storage.", + ConstLabels: constLabels, }, []string{result}, ), sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_latency_milliseconds", - Help: "Latency quantiles for sending sample batches to the remote storage.", + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_latency_milliseconds", + Help: "Latency quantiles for sending sample batches to the remote storage.", + ConstLabels: constLabels, }), sendErrors: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_errors_total", - Help: "Total number of errors sending sample batches to the remote storage.", + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_errors_total", + Help: "Total number of errors sending sample batches to the remote storage.", + ConstLabels: constLabels, }), queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queue_length", - Help: "The number of processed samples queued to be sent to the remote storage.", + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of processed samples queued to be sent to the remote storage.", + ConstLabels: constLabels, }), queueCapacity: prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), "The capacity of the queue of samples to be sent to the remote storage.", - nil, nil, + nil, + constLabels, ), prometheus.GaugeValue, float64(queueCapacity), diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 2959ac5d6..b308ca433 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -46,6 +46,10 @@ func (c *TestStorageClient) Store(s clientmodel.Samples) error { return nil } +func (c TestStorageClient) Name() string { + return "teststorageclient" +} + func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. diff --git a/storage/storage.go b/storage/storage.go index 4984f9d68..68ac6f649 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -23,16 +23,15 @@ type SampleAppender interface { Append(*clientmodel.Sample) } -// Tee is a SampleAppender that appends every sample to two other +// Fanout is a SampleAppender that appends every sample to a list of other // SampleAppenders. -type Tee struct { - Appender1, Appender2 SampleAppender -} +type Fanout []SampleAppender -// Append implements SampleAppender. It appends the provided sample first -// to Appender1, then to Appender2, waiting for each to return before -// proceeding. -func (t Tee) Append(s *clientmodel.Sample) { - t.Appender1.Append(s) - t.Appender2.Append(s) +// Append implements SampleAppender. It appends the provided sample to all +// SampleAppenders in the Fanout slice and waits for each append to complete +// before proceeding with the next. +func (f Fanout) Append(s *clientmodel.Sample) { + for _, a := range f { + a.Append(s) + } }