mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #13135 from machine424/race-ww
remote-write: fix race condition between ApplyConfig and Notify
This commit is contained in:
commit
c44cab45b9
|
@ -77,10 +77,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) Notify() {
|
func (s *Storage) Notify() {
|
||||||
for _, q := range s.rws.queues {
|
s.rws.Notify()
|
||||||
// These should all be non blocking
|
|
||||||
q.watcher.Notify()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyConfig updates the state as the new config requires.
|
// ApplyConfig updates the state as the new config requires.
|
||||||
|
|
|
@ -14,7 +14,9 @@
|
||||||
package remote
|
package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
common_config "github.com/prometheus/common/config"
|
common_config "github.com/prometheus/common/config"
|
||||||
|
@ -147,3 +149,39 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
|
||||||
}
|
}
|
||||||
return &cfg
|
return &cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWriteStorageApplyConfigsDuringCommit helps detecting races when
|
||||||
|
// ApplyConfig runs concurrently with Notify
|
||||||
|
// See https://github.com/prometheus/prometheus/issues/12747
|
||||||
|
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
|
||||||
|
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2000)
|
||||||
|
|
||||||
|
start := make(chan struct{})
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
<-start
|
||||||
|
conf := &config.Config{
|
||||||
|
GlobalConfig: config.DefaultGlobalConfig,
|
||||||
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||||
|
baseRemoteWriteConfig(fmt.Sprintf("http://test-%d.com", i)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(t, s.ApplyConfig(conf))
|
||||||
|
wg.Done()
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
s.Notify()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
|
@ -121,6 +121,16 @@ func (rws *WriteStorage) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rws *WriteStorage) Notify() {
|
||||||
|
rws.mtx.Lock()
|
||||||
|
defer rws.mtx.Unlock()
|
||||||
|
|
||||||
|
for _, q := range rws.queues {
|
||||||
|
// These should all be non blocking
|
||||||
|
q.watcher.Notify()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ApplyConfig updates the state as the new config requires.
|
// ApplyConfig updates the state as the new config requires.
|
||||||
// Only stop & create queues which have changes.
|
// Only stop & create queues which have changes.
|
||||||
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
|
|
Loading…
Reference in a new issue