fix TODO: only stop & recreate remote write queues which have changes (#5540)

Signed-off-by: YaoZengzeng <yaozengzeng@zju.edu.cn>
This commit is contained in:
Yao Zengzeng 2019-09-05 01:21:53 +08:00 committed by Chris Marchbanks
parent 24844cc222
commit f65b7c296d
2 changed files with 100 additions and 14 deletions

View file

@ -50,11 +50,13 @@ type WriteStorage struct {
logger log.Logger logger log.Logger
mtx sync.Mutex mtx sync.Mutex
configHash [16]byte configHash [16]byte
walDir string externalLabelHash [16]byte
queues []*QueueManager walDir string
samplesIn *ewmaRate queues []*QueueManager
flushDeadline time.Duration hashes [][16]byte
samplesIn *ewmaRate
flushDeadline time.Duration
} }
// NewWriteStorage creates and runs a WriteStorage. // NewWriteStorage creates and runs a WriteStorage.
@ -81,6 +83,7 @@ func (rws *WriteStorage) run() {
} }
// ApplyConfig updates the state as the new config requires. // ApplyConfig updates the state as the new config requires.
// Only stop & create queues which have changes.
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.mtx.Lock() rws.mtx.Lock()
defer rws.mtx.Unlock() defer rws.mtx.Unlock()
@ -97,19 +100,38 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
return err return err
} }
hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) configHash := md5.Sum(cfgBytes)
if hash == rws.configHash { externalLabelHash := md5.Sum(externalLabelBytes)
externalLabelUnchanged := externalLabelHash == rws.externalLabelHash
if configHash == rws.configHash && externalLabelUnchanged {
level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers")
return nil return nil
} }
rws.configHash = hash rws.configHash = configHash
rws.externalLabelHash = externalLabelHash
// Update write queues // Update write queues
newQueues := []*QueueManager{} newQueues := []*QueueManager{}
// TODO: we should only stop & recreate queues which have changes, newHashes := [][16]byte{}
// as this can be quite disruptive. newClientIndexes := []int{}
for i, rwConf := range conf.RemoteWriteConfigs { for i, rwConf := range conf.RemoteWriteConfigs {
b, err := json.Marshal(rwConf)
if err != nil {
return err
}
// Use RemoteWriteConfigs and its index to get hash. So if its index changed,
// the correspoinding queue should also be restarted.
hash := md5.Sum(b)
if i < len(rws.queues) && rws.hashes[i] == hash && externalLabelUnchanged {
// The RemoteWriteConfig and index both not changed, keep the queue.
newQueues = append(newQueues, rws.queues[i])
newHashes = append(newHashes, hash)
rws.queues[i] = nil
continue
}
// Otherwise create a new queue.
c, err := NewClient(i, &ClientConfig{ c, err := NewClient(i, &ClientConfig{
URL: rwConf.URL, URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout, Timeout: rwConf.RemoteTimeout,
@ -128,16 +150,24 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
c, c,
rws.flushDeadline, rws.flushDeadline,
)) ))
newHashes = append(newHashes, hash)
newClientIndexes = append(newClientIndexes, i)
} }
for _, q := range rws.queues { for _, q := range rws.queues {
q.Stop() // A nil queue means that queue has been reused.
if q != nil {
q.Stop()
}
}
for _, index := range newClientIndexes {
newQueues[index].Start()
} }
rws.queues = newQueues rws.queues = newQueues
for _, q := range rws.queues { rws.hashes = newHashes
q.Start()
}
return nil return nil
} }

View file

@ -17,7 +17,9 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
@ -93,3 +95,57 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
err = s.Close() err = s.Close()
testutil.Ok(t, err) testutil.Ok(t, err)
} }
func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsPartialUpdate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),
QueueConfig: config.DefaultQueueConfig,
}
c1 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(20 * time.Second),
QueueConfig: config.DefaultQueueConfig,
}
c2 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: config.DefaultQueueConfig,
}
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
s.ApplyConfig(conf)
testutil.Equals(t, 3, len(s.queues))
q := s.queues[1]
// Update c0 and c2.
c0.RemoteTimeout = model.Duration(40 * time.Second)
c2.RemoteTimeout = model.Duration(50 * time.Second)
conf = &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
s.ApplyConfig(conf)
testutil.Equals(t, 3, len(s.queues))
testutil.Assert(t, q == s.queues[1], "Pointer of unchanged queue should have remained the same")
// Delete c0.
conf = &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2},
}
s.ApplyConfig(conf)
testutil.Equals(t, 2, len(s.queues))
testutil.Assert(t, q != s.queues[1], "If the index changed, the queue should be stopped and recreated.")
err = s.Close()
testutil.Ok(t, err)
}