diff --git a/storage/remote/storage.go b/storage/remote/storage.go index bdb14ee29..d1e83fdf6 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -82,12 +82,53 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() defer s.mtx.Unlock() + if err := s.applyRemoteWriteConfig(conf); err != nil { + return err + } + + // Update read clients + queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) + for i, rrConf := range conf.RemoteReadConfigs { + c, err := NewClient(i, &ClientConfig{ + URL: rrConf.URL, + Timeout: rrConf.RemoteTimeout, + HTTPClientConfig: rrConf.HTTPClientConfig, + }) + if err != nil { + return err + } + + q := QueryableClient(c) + q = ExternalLabelsHandler(q, conf.GlobalConfig.ExternalLabels) + if len(rrConf.RequiredMatchers) > 0 { + q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) + } + if !rrConf.ReadRecent { + q = PreferLocalStorageFilter(q, s.localStartTimeCallback) + } + queryables = append(queryables, q) + } + s.queryables = queryables + + return nil +} + +// applyRemoteWriteConfig applies the remote write config only if the config has changed. +// The caller must hold the lock on s.mtx. +func (s *Storage) applyRemoteWriteConfig(conf *config.Config) error { + // Remote write queues only need to change if the remote write config or + // external labels change. Hash these together and only reload if the hash + // changes. cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) if err != nil { return err } + externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) + if err != nil { + return err + } - hash := md5.Sum(cfgBytes) + hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) if hash == s.configHash { level.Debug(s.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") return nil @@ -129,30 +170,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { q.Start() } - // Update read clients - queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) - for i, rrConf := range conf.RemoteReadConfigs { - c, err := NewClient(i, &ClientConfig{ - URL: rrConf.URL, - Timeout: rrConf.RemoteTimeout, - HTTPClientConfig: rrConf.HTTPClientConfig, - }) - if err != nil { - return err - } - - q := QueryableClient(c) - q = ExternalLabelsHandler(q, conf.GlobalConfig.ExternalLabels) - if len(rrConf.RequiredMatchers) > 0 { - q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) - } - if !rrConf.ReadRecent { - q = PreferLocalStorageFilter(q, s.localStartTimeCallback) - } - queryables = append(queryables, q) - } - s.queryables = queryables - return nil } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go new file mode 100644 index 000000000..285b4e3bd --- /dev/null +++ b/storage/remote/storage_test.go @@ -0,0 +1,130 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestStorageLifecycle(t *testing.T) { + dir, err := ioutil.TempDir("", "TestStorageLifecycle") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + RemoteReadConfigs: []*config.RemoteReadConfig{ + &config.DefaultRemoteReadConfig, + }, + } + s.ApplyConfig(conf) + + // make sure remote write has a queue. + testutil.Equals(t, 1, len(s.queues)) + + // make sure remote write has a queue. + testutil.Equals(t, 1, len(s.queryables)) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestUpdateExternalLabels(t *testing.T) { + dir, err := ioutil.TempDir("", "TestUpdateExternalLabels") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) + + externalLabels := labels.FromStrings("external", "true") + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels) + + conf.GlobalConfig.ExternalLabels = externalLabels + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, externalLabels, s.queues[0].externalLabels) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestUpdateRemoteReadConfigs(t *testing.T) { + dir, err := ioutil.TempDir("", "TestUpdateRemoteReadConfigs") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) + + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + } + s.ApplyConfig(conf) + testutil.Equals(t, 0, len(s.queryables)) + + conf.RemoteReadConfigs = []*config.RemoteReadConfig{ + &config.DefaultRemoteReadConfig, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queryables)) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestUpdateRemoteWriteConfigsNoop(t *testing.T) { + dir, err := ioutil.TempDir("", "TestUpdateRemoteWriteConfigsNoop") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) + + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + queue := s.queues[0] + + conf.RemoteReadConfigs = []*config.RemoteReadConfig{ + &config.DefaultRemoteReadConfig, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same") + + err = s.Close() + testutil.Ok(t, err) +}