From 08c17df24434d289f13319928ef22f79f926d9b2 Mon Sep 17 00:00:00 2001 From: machine424 Date: Sat, 11 Nov 2023 14:07:09 +0100 Subject: [PATCH 1/2] remote/storage.go: add a test to highlight a race condition between Storage.Notify() and Storage.ApplyConfig() see https://github.com/prometheus/prometheus/issues/12747 Signed-off-by: machine424 --- storage/remote/storage_test.go | 38 ++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index b2848f933..040a23a5a 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -14,7 +14,9 @@ package remote import ( + "fmt" "net/url" + "sync" "testing" common_config "github.com/prometheus/common/config" @@ -147,3 +149,39 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig { } return &cfg } + +// TestWriteStorageApplyConfigsDuringCommit helps detecting races when +// ApplyConfig runs concurrently with Notify +// See https://github.com/prometheus/prometheus/issues/12747 +func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) { + s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil) + + var wg sync.WaitGroup + wg.Add(2000) + + start := make(chan struct{}) + for i := 0; i < 1000; i++ { + go func(i int) { + <-start + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + baseRemoteWriteConfig(fmt.Sprintf("http://test-%d.com", i)), + }, + } + require.NoError(t, s.ApplyConfig(conf)) + wg.Done() + }(i) + } + + for i := 0; i < 1000; i++ { + go func() { + <-start + s.Notify() + wg.Done() + }() + } + + close(start) + wg.Wait() +} From 413b713aa8f4ed666ea2820d351333d4bb24fa7e Mon Sep 17 00:00:00 2001 From: machine424 Date: Mon, 13 Nov 2023 13:26:02 +0100 Subject: [PATCH 2/2] remote/storage.go: adjust Storage.Notify() to avoid a race condition with Storage.ApplyConfig() Signed-off-by: machine424 --- storage/remote/storage.go | 5 +---- storage/remote/write.go | 10 ++++++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index b6533f927..758ba3cc9 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -77,10 +77,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal } func (s *Storage) Notify() { - for _, q := range s.rws.queues { - // These should all be non blocking - q.watcher.Notify() - } + s.rws.Notify() } // ApplyConfig updates the state as the new config requires. diff --git a/storage/remote/write.go b/storage/remote/write.go index 4b0a24901..237f8caa9 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -121,6 +121,16 @@ func (rws *WriteStorage) run() { } } +func (rws *WriteStorage) Notify() { + rws.mtx.Lock() + defer rws.mtx.Unlock() + + for _, q := range rws.queues { + // These should all be non blocking + q.watcher.Notify() + } +} + // ApplyConfig updates the state as the new config requires. // Only stop & create queues which have changes. func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {