main: cleanup initialization of remote storage.

This commit is contained in:
Fabian Reinartz 2015-06-23 18:04:04 +02:00
parent e18bc94980
commit 23e77450ff
3 changed files with 113 additions and 52 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
) )
@ -42,13 +43,7 @@ var cfg = struct {
notification notification.NotificationHandlerOptions notification notification.NotificationHandlerOptions
queryEngine promql.EngineOptions queryEngine promql.EngineOptions
web web.Options web web.Options
remote remote.Options
// Remote storage.
remoteStorageTimeout time.Duration
influxdbURL string
influxdbRetentionPolicy string
influxdbDatabase string
opentsdbURL string
prometheusURL string prometheusURL string
}{} }{}
@ -167,23 +162,23 @@ func init() {
// Remote storage. // Remote storage.
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.opentsdbURL, "storage.remote.opentsdb-url", "", &cfg.remote.OpentsdbURL, "storage.remote.opentsdb-url", "",
"The URL of the remote OpenTSDB server to send samples to. None, if empty.", "The URL of the remote OpenTSDB server to send samples to. None, if empty.",
) )
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.influxdbURL, "storage.remote.influxdb-url", "", &cfg.remote.InfluxdbURL, "storage.remote.influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.", "The URL of the remote InfluxDB server to send samples to. None, if empty.",
) )
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.influxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default", &cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
"The InfluxDB retention policy to use.", "The InfluxDB retention policy to use.",
) )
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.influxdbDatabase, "storage.remote.influxdb.database", "prometheus", &cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
"The name of the database to use for storing samples in InfluxDB.", "The name of the database to use for storing samples in InfluxDB.",
) )
cfg.fs.DurationVar( cfg.fs.DurationVar(
&cfg.remoteStorageTimeout, "storage.remote.timeout", 30*time.Second, &cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second,
"The timeout to use when sending samples to the remote storage.", "The timeout to use when sending samples to the remote storage.",
) )

View file

@ -28,7 +28,7 @@ import (
"github.com/prometheus/log" "github.com/prometheus/log"
registry "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notification" "github.com/prometheus/prometheus/notification"
@ -38,8 +38,6 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/prometheus/version" "github.com/prometheus/prometheus/version"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
) )
@ -58,39 +56,17 @@ func Main() int {
return 0 return 0
} }
memStorage := local.NewMemorySeriesStorage(&cfg.storage)
var (
sampleAppender storage.SampleAppender
remoteStorageQueues []*remote.StorageQueueManager
)
if cfg.opentsdbURL == "" && cfg.influxdbURL == "" {
log.Warnf("No remote storage URLs provided; not sending any samples to long-term storage")
sampleAppender = memStorage
} else {
fanout := storage.Fanout{memStorage}
addRemoteStorage := func(c remote.StorageClient) {
qm := remote.NewStorageQueueManager(c, 100*1024)
fanout = append(fanout, qm)
remoteStorageQueues = append(remoteStorageQueues, qm)
}
if cfg.opentsdbURL != "" {
addRemoteStorage(opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteStorageTimeout))
}
if cfg.influxdbURL != "" {
addRemoteStorage(influxdb.NewClient(cfg.influxdbURL, cfg.remoteStorageTimeout, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy))
}
sampleAppender = fanout
}
var ( var (
memStorage = local.NewMemorySeriesStorage(&cfg.storage)
remoteStorage = remote.New(&cfg.remote)
sampleAppender = storage.Fanout{memStorage}
notificationHandler = notification.NewNotificationHandler(&cfg.notification) notificationHandler = notification.NewNotificationHandler(&cfg.notification)
targetManager = retrieval.NewTargetManager(sampleAppender) targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine) queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine)
) )
if remoteStorage != nil {
sampleAppender = append(sampleAppender, remoteStorage)
}
ruleManager := rules.NewManager(&rules.ManagerOptions{ ruleManager := rules.NewManager(&rules.ManagerOptions{
SampleAppender: sampleAppender, SampleAppender: sampleAppender,
@ -115,7 +91,7 @@ func Main() int {
webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web) webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web)
if !reloadConfig(cfg.configFile, status, targetManager, ruleManager) { if !reloadConfig(cfg.configFile, status, targetManager, ruleManager) {
os.Exit(1) return 1
} }
// Wait for reload or termination signals. Start the handler for SIGHUP as // Wait for reload or termination signals. Start the handler for SIGHUP as
@ -142,16 +118,15 @@ func Main() int {
} }
}() }()
// The storage has to be fully initialized before registering. if remoteStorage != nil {
registry.MustRegister(memStorage) prometheus.MustRegister(remoteStorage)
registry.MustRegister(notificationHandler)
for _, q := range remoteStorageQueues { go remoteStorage.Run()
registry.MustRegister(q) defer remoteStorage.Stop()
go q.Run()
defer q.Stop()
} }
// The storage has to be fully initialized before registering.
prometheus.MustRegister(memStorage)
prometheus.MustRegister(notificationHandler)
go ruleManager.Run() go ruleManager.Run()
defer ruleManager.Stop() defer ruleManager.Stop()

91
storage/remote/remote.go Normal file
View file

@ -0,0 +1,91 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"time"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model"
)
// Storage collects multiple remote storage queues.
type Storage struct {
queues []*StorageQueueManager
}
// New returns a new remote Storage.
func New(o *Options) *Storage {
s := &Storage{}
if o.OpentsdbURL != "" {
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if o.InfluxdbURL != "" {
c := influxdb.NewClient(o.InfluxdbURL, o.StorageTimeout, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if len(s.queues) == 0 {
return nil
}
return s
}
// Options contains configuration parameters for a remote storage.
type Options struct {
StorageTimeout time.Duration
InfluxdbURL string
InfluxdbRetentionPolicy string
InfluxdbDatabase string
OpentsdbURL string
}
// Run starts the background processing of the storage queues.
func (s *Storage) Run() {
for _, q := range s.queues {
go q.Run()
}
}
// Stop the background processing of the storage queues.
func (s *Storage) Stop() {
for _, q := range s.queues {
q.Stop()
}
}
// Append implements storage.SampleAppender.
func (s *Storage) Append(smpl *clientmodel.Sample) {
for _, q := range s.queues {
q.Append(smpl)
}
}
// Describe implements prometheus.Collector.
func (s *Storage) Describe(ch chan<- *prometheus.Desc) {
for _, q := range s.queues {
q.Describe(ch)
}
}
// Collect implements prometheus.Collector.
func (s *Storage) Collect(ch chan<- prometheus.Metric) {
for _, q := range s.queues {
q.Collect(ch)
}
}