From a38a54fa117442c8d8c80020421941bf5c4145be Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 31 May 2019 19:39:40 -0600 Subject: [PATCH] Split remote write storage into its own type This allows other processes to reuse just the remote write code without having to use the remote read code as well. This will be used to create a sidecar capable of sending remote write payloads. Signed-off-by: Chris Marchbanks --- storage/remote/storage.go | 98 +++----------------------- storage/remote/storage_test.go | 58 +-------------- storage/remote/write.go | 124 +++++++++++++++++++++++++++++++-- storage/remote/write_test.go | 95 +++++++++++++++++++++++++ 4 files changed, 224 insertions(+), 151 deletions(-) create mode 100644 storage/remote/write_test.go diff --git a/storage/remote/storage.go b/storage/remote/storage.go index d1e83fdf6d..444c04954c 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -15,13 +15,10 @@ package remote import ( "context" - "crypto/md5" - "encoding/json" "sync" "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -40,13 +37,7 @@ type Storage struct { logger log.Logger mtx sync.Mutex - configHash [16]byte - - // For writes - walDir string - queues []*QueueManager - samplesIn *ewmaRate - flushDeadline time.Duration + rws *WriteStorage // For reads queryables []storage.Queryable @@ -61,28 +52,17 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal s := &Storage{ logger: logging.Dedupe(l, 1*time.Minute), localStartTimeCallback: stCallback, - flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - walDir: walDir, + rws: NewWriteStorage(l, walDir, flushDeadline), } - go s.run() return s } -func (s *Storage) run() { - ticker := time.NewTicker(shardUpdateDuration) - defer ticker.Stop() - for range ticker.C { - s.samplesIn.tick() - } -} - // ApplyConfig updates the state as the new config requires. func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() defer s.mtx.Unlock() - if err := s.applyRemoteWriteConfig(conf); err != nil { + if err := s.rws.ApplyConfig(conf); err != nil { return err } @@ -113,66 +93,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { return nil } -// applyRemoteWriteConfig applies the remote write config only if the config has changed. -// The caller must hold the lock on s.mtx. -func (s *Storage) applyRemoteWriteConfig(conf *config.Config) error { - // Remote write queues only need to change if the remote write config or - // external labels change. Hash these together and only reload if the hash - // changes. - cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) - if err != nil { - return err - } - externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) - if err != nil { - return err - } - - hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) - if hash == s.configHash { - level.Debug(s.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") - return nil - } - - s.configHash = hash - - // Update write queues - newQueues := []*QueueManager{} - // TODO: we should only stop & recreate queues which have changes, - // as this can be quite disruptive. - for i, rwConf := range conf.RemoteWriteConfigs { - c, err := NewClient(i, &ClientConfig{ - URL: rwConf.URL, - Timeout: rwConf.RemoteTimeout, - HTTPClientConfig: rwConf.HTTPClientConfig, - }) - if err != nil { - return err - } - newQueues = append(newQueues, NewQueueManager( - s.logger, - s.walDir, - s.samplesIn, - rwConf.QueueConfig, - conf.GlobalConfig.ExternalLabels, - rwConf.WriteRelabelConfigs, - c, - s.flushDeadline, - )) - } - - for _, q := range s.queues { - q.Stop() - } - - s.queues = newQueues - for _, q := range s.queues { - q.Start() - } - - return nil -} - // StartTime implements the Storage interface. func (s *Storage) StartTime() (int64, error) { return int64(model.Latest), nil @@ -196,16 +116,16 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie return storage.NewMergeQuerier(nil, queriers), nil } +// Appender implements storage.Storage. +func (s *Storage) Appender() (storage.Appender, error) { + return s.rws.Appender() +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() defer s.mtx.Unlock() - - for _, q := range s.queues { - q.Stop() - } - - return nil + return s.rws.Close() } func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher { diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 285b4e3bd6..b03639253d 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/util/testutil" ) @@ -42,7 +41,7 @@ func TestStorageLifecycle(t *testing.T) { s.ApplyConfig(conf) // make sure remote write has a queue. - testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, 1, len(s.rws.queues)) // make sure remote write has a queue. testutil.Equals(t, 1, len(s.queryables)) @@ -51,33 +50,6 @@ func TestStorageLifecycle(t *testing.T) { testutil.Ok(t, err) } -func TestUpdateExternalLabels(t *testing.T) { - dir, err := ioutil.TempDir("", "TestUpdateExternalLabels") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) - - externalLabels := labels.FromStrings("external", "true") - conf := &config.Config{ - GlobalConfig: config.GlobalConfig{}, - RemoteWriteConfigs: []*config.RemoteWriteConfig{ - &config.DefaultRemoteWriteConfig, - }, - } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels) - - conf.GlobalConfig.ExternalLabels = externalLabels - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - testutil.Equals(t, externalLabels, s.queues[0].externalLabels) - - err = s.Close() - testutil.Ok(t, err) -} - func TestUpdateRemoteReadConfigs(t *testing.T) { dir, err := ioutil.TempDir("", "TestUpdateRemoteReadConfigs") testutil.Ok(t, err) @@ -100,31 +72,3 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { err = s.Close() testutil.Ok(t, err) } - -func TestUpdateRemoteWriteConfigsNoop(t *testing.T) { - dir, err := ioutil.TempDir("", "TestUpdateRemoteWriteConfigsNoop") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) - - conf := &config.Config{ - GlobalConfig: config.GlobalConfig{}, - RemoteWriteConfigs: []*config.RemoteWriteConfig{ - &config.DefaultRemoteWriteConfig, - }, - } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - queue := s.queues[0] - - conf.RemoteReadConfigs = []*config.RemoteReadConfig{ - &config.DefaultRemoteReadConfig, - } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same") - - err = s.Close() - testutil.Ok(t, err) -} diff --git a/storage/remote/write.go b/storage/remote/write.go index b1e3ee9fd9..dba8b1fd5f 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,8 +14,16 @@ package remote import ( + "crypto/md5" + "encoding/json" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -37,15 +45,121 @@ var ( } ) -// Appender implements scrape.Appendable. -func (s *Storage) Appender() (storage.Appender, error) { +// WriteStorage represents all the remote write storage. +type WriteStorage struct { + logger log.Logger + mtx sync.Mutex + + configHash [16]byte + walDir string + queues []*QueueManager + samplesIn *ewmaRate + flushDeadline time.Duration +} + +// NewWriteStorage creates and runs a WriteStorage. +func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Duration) *WriteStorage { + if logger == nil { + logger = log.NewNopLogger() + } + rws := &WriteStorage{ + logger: logger, + flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + walDir: walDir, + } + go rws.run() + return rws +} + +func (rws *WriteStorage) run() { + ticker := time.NewTicker(shardUpdateDuration) + defer ticker.Stop() + for range ticker.C { + rws.samplesIn.tick() + } +} + +// ApplyConfig updates the state as the new config requires. +func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { + rws.mtx.Lock() + defer rws.mtx.Unlock() + + // Remote write queues only need to change if the remote write config or + // external labels change. Hash these together and only reload if the hash + // changes. + cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) + if err != nil { + return err + } + externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) + if err != nil { + return err + } + + hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) + if hash == rws.configHash { + level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") + return nil + } + + rws.configHash = hash + + // Update write queues + newQueues := []*QueueManager{} + // TODO: we should only stop & recreate queues which have changes, + // as this can be quite disruptive. + for i, rwConf := range conf.RemoteWriteConfigs { + c, err := NewClient(i, &ClientConfig{ + URL: rwConf.URL, + Timeout: rwConf.RemoteTimeout, + HTTPClientConfig: rwConf.HTTPClientConfig, + }) + if err != nil { + return err + } + newQueues = append(newQueues, NewQueueManager( + rws.logger, + rws.walDir, + rws.samplesIn, + rwConf.QueueConfig, + conf.GlobalConfig.ExternalLabels, + rwConf.WriteRelabelConfigs, + c, + rws.flushDeadline, + )) + } + + for _, q := range rws.queues { + q.Stop() + } + + rws.queues = newQueues + for _, q := range rws.queues { + q.Start() + } + return nil +} + +// Appender implements storage.Storage. +func (rws *WriteStorage) Appender() (storage.Appender, error) { return ×tampTracker{ - storage: s, + writeStorage: rws, }, nil } +// Close closes the WriteStorage. +func (rws *WriteStorage) Close() error { + rws.mtx.Lock() + defer rws.mtx.Unlock() + for _, q := range rws.queues { + q.Stop() + } + return nil +} + type timestampTracker struct { - storage *Storage + writeStorage *WriteStorage samples int64 highestTimestamp int64 } @@ -67,7 +181,7 @@ func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float6 // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { - t.storage.samplesIn.incr(t.samples) + t.writeStorage.samplesIn.incr(t.samples) samplesIn.Add(float64(t.samples)) highestTimestamp.Set(float64(t.highestTimestamp / 1000)) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go new file mode 100644 index 0000000000..c87cf3e620 --- /dev/null +++ b/storage/remote/write_test.go @@ -0,0 +1,95 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestWriteStorageLifecycle(t *testing.T) { + dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestUpdateExternalLabels(t *testing.T) { + dir, err := ioutil.TempDir("", "TestUpdateExternalLabels") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + + externalLabels := labels.FromStrings("external", "true") + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels) + + conf.GlobalConfig.ExternalLabels = externalLabels + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, externalLabels, s.queues[0].externalLabels) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { + dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsIdempotent") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + queue := s.queues[0] + + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same") + + err = s.Close() + testutil.Ok(t, err) +}