diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index a96953166..8aac0fd23 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local/index" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/web" ) @@ -42,13 +43,7 @@ var cfg = struct { notification notification.NotificationHandlerOptions queryEngine promql.EngineOptions web web.Options - - // Remote storage. - remoteStorageTimeout time.Duration - influxdbURL string - influxdbRetentionPolicy string - influxdbDatabase string - opentsdbURL string + remote remote.Options prometheusURL string }{} @@ -167,23 +162,23 @@ func init() { // Remote storage. cfg.fs.StringVar( - &cfg.opentsdbURL, "storage.remote.opentsdb-url", "", + &cfg.remote.OpentsdbURL, "storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to. None, if empty.", ) cfg.fs.StringVar( - &cfg.influxdbURL, "storage.remote.influxdb-url", "", + &cfg.remote.InfluxdbURL, "storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.", ) cfg.fs.StringVar( - &cfg.influxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default", + &cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default", "The InfluxDB retention policy to use.", ) cfg.fs.StringVar( - &cfg.influxdbDatabase, "storage.remote.influxdb.database", "prometheus", + &cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.", ) cfg.fs.DurationVar( - &cfg.remoteStorageTimeout, "storage.remote.timeout", 30*time.Second, + &cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.", ) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 2687d6102..098d155fa 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -28,7 +28,7 @@ import ( "github.com/prometheus/log" - registry "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notification" @@ -38,8 +38,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/remote" - "github.com/prometheus/prometheus/storage/remote/influxdb" - "github.com/prometheus/prometheus/storage/remote/opentsdb" "github.com/prometheus/prometheus/version" "github.com/prometheus/prometheus/web" ) @@ -58,39 +56,17 @@ func Main() int { return 0 } - memStorage := local.NewMemorySeriesStorage(&cfg.storage) - - var ( - sampleAppender storage.SampleAppender - remoteStorageQueues []*remote.StorageQueueManager - ) - if cfg.opentsdbURL == "" && cfg.influxdbURL == "" { - log.Warnf("No remote storage URLs provided; not sending any samples to long-term storage") - sampleAppender = memStorage - } else { - fanout := storage.Fanout{memStorage} - - addRemoteStorage := func(c remote.StorageClient) { - qm := remote.NewStorageQueueManager(c, 100*1024) - fanout = append(fanout, qm) - remoteStorageQueues = append(remoteStorageQueues, qm) - } - - if cfg.opentsdbURL != "" { - addRemoteStorage(opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteStorageTimeout)) - } - if cfg.influxdbURL != "" { - addRemoteStorage(influxdb.NewClient(cfg.influxdbURL, cfg.remoteStorageTimeout, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy)) - } - - sampleAppender = fanout - } - var ( + memStorage = local.NewMemorySeriesStorage(&cfg.storage) + remoteStorage = remote.New(&cfg.remote) + sampleAppender = storage.Fanout{memStorage} notificationHandler = notification.NewNotificationHandler(&cfg.notification) targetManager = retrieval.NewTargetManager(sampleAppender) queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine) ) + if remoteStorage != nil { + sampleAppender = append(sampleAppender, remoteStorage) + } ruleManager := rules.NewManager(&rules.ManagerOptions{ SampleAppender: sampleAppender, @@ -115,7 +91,7 @@ func Main() int { webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web) if !reloadConfig(cfg.configFile, status, targetManager, ruleManager) { - os.Exit(1) + return 1 } // Wait for reload or termination signals. Start the handler for SIGHUP as @@ -142,16 +118,15 @@ func Main() int { } }() - // The storage has to be fully initialized before registering. - registry.MustRegister(memStorage) - registry.MustRegister(notificationHandler) + if remoteStorage != nil { + prometheus.MustRegister(remoteStorage) - for _, q := range remoteStorageQueues { - registry.MustRegister(q) - - go q.Run() - defer q.Stop() + go remoteStorage.Run() + defer remoteStorage.Stop() } + // The storage has to be fully initialized before registering. + prometheus.MustRegister(memStorage) + prometheus.MustRegister(notificationHandler) go ruleManager.Run() defer ruleManager.Stop() diff --git a/storage/remote/remote.go b/storage/remote/remote.go new file mode 100644 index 000000000..5f8e611df --- /dev/null +++ b/storage/remote/remote.go @@ -0,0 +1,91 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "time" + + "github.com/prometheus/prometheus/storage/remote/influxdb" + "github.com/prometheus/prometheus/storage/remote/opentsdb" + + "github.com/prometheus/client_golang/prometheus" + + clientmodel "github.com/prometheus/client_golang/model" +) + +// Storage collects multiple remote storage queues. +type Storage struct { + queues []*StorageQueueManager +} + +// New returns a new remote Storage. +func New(o *Options) *Storage { + s := &Storage{} + if o.OpentsdbURL != "" { + c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout) + s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + } + if o.InfluxdbURL != "" { + c := influxdb.NewClient(o.InfluxdbURL, o.StorageTimeout, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy) + s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + } + if len(s.queues) == 0 { + return nil + } + return s +} + +// Options contains configuration parameters for a remote storage. +type Options struct { + StorageTimeout time.Duration + InfluxdbURL string + InfluxdbRetentionPolicy string + InfluxdbDatabase string + OpentsdbURL string +} + +// Run starts the background processing of the storage queues. +func (s *Storage) Run() { + for _, q := range s.queues { + go q.Run() + } +} + +// Stop the background processing of the storage queues. +func (s *Storage) Stop() { + for _, q := range s.queues { + q.Stop() + } +} + +// Append implements storage.SampleAppender. +func (s *Storage) Append(smpl *clientmodel.Sample) { + for _, q := range s.queues { + q.Append(smpl) + } +} + +// Describe implements prometheus.Collector. +func (s *Storage) Describe(ch chan<- *prometheus.Desc) { + for _, q := range s.queues { + q.Describe(ch) + } +} + +// Collect implements prometheus.Collector. +func (s *Storage) Collect(ch chan<- prometheus.Metric) { + for _, q := range s.queues { + q.Collect(ch) + } +}