mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Do not start storage processing before Start() is called.
This commit is contained in:
parent
9ca47869ed
commit
d8440d75f1
24
main.go
24
main.go
|
@ -118,11 +118,7 @@ func NewPrometheus() *prometheus {
|
||||||
PedanticChecks: *storagePedanticChecks,
|
PedanticChecks: *storagePedanticChecks,
|
||||||
SyncStrategy: syncStrategy,
|
SyncStrategy: syncStrategy,
|
||||||
}
|
}
|
||||||
memStorage, err := local.NewMemorySeriesStorage(o)
|
memStorage := local.NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
|
||||||
glog.Error("Error opening memory series storage: ", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
var sampleAppender storage.SampleAppender
|
var sampleAppender storage.SampleAppender
|
||||||
var remoteStorageQueues []*remote.StorageQueueManager
|
var remoteStorageQueues []*remote.StorageQueueManager
|
||||||
|
@ -213,38 +209,47 @@ func NewPrometheus() *prometheus {
|
||||||
}
|
}
|
||||||
webService.QuitChan = make(chan struct{})
|
webService.QuitChan = make(chan struct{})
|
||||||
|
|
||||||
p.reloadConfig()
|
if !p.reloadConfig() {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *prometheus) reloadConfig() {
|
func (p *prometheus) reloadConfig() bool {
|
||||||
glog.Infof("Loading configuration file %s", *configFile)
|
glog.Infof("Loading configuration file %s", *configFile)
|
||||||
|
|
||||||
conf, err := config.LoadFromFile(*configFile)
|
conf, err := config.LoadFromFile(*configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err)
|
glog.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err)
|
||||||
glog.Errorf("Note: The configuration format has changed with version 0.14, please check the documentation.")
|
glog.Errorf("Note: The configuration format has changed with version 0.14, please check the documentation.")
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
p.webService.StatusHandler.ApplyConfig(conf)
|
p.webService.StatusHandler.ApplyConfig(conf)
|
||||||
p.targetManager.ApplyConfig(conf)
|
p.targetManager.ApplyConfig(conf)
|
||||||
p.ruleManager.ApplyConfig(conf)
|
p.ruleManager.ApplyConfig(conf)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve starts the Prometheus server. It returns after the server has been shut
|
// Serve starts the Prometheus server. It returns after the server has been shut
|
||||||
// down. The method installs an interrupt handler, allowing to trigger a
|
// down. The method installs an interrupt handler, allowing to trigger a
|
||||||
// shutdown by sending SIGTERM to the process.
|
// shutdown by sending SIGTERM to the process.
|
||||||
func (p *prometheus) Serve() {
|
func (p *prometheus) Serve() {
|
||||||
|
if err := p.storage.Start(); err != nil {
|
||||||
|
glog.Error("Error opening memory series storage: ", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
for _, q := range p.remoteStorageQueues {
|
for _, q := range p.remoteStorageQueues {
|
||||||
go q.Run()
|
go q.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
go p.ruleManager.Run()
|
go p.ruleManager.Run()
|
||||||
go p.notificationHandler.Run()
|
go p.notificationHandler.Run()
|
||||||
go p.targetManager.Run()
|
go p.targetManager.Run()
|
||||||
|
|
||||||
p.storage.Start()
|
registry.MustRegister(p)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := p.webService.ServeForever(*pathPrefix)
|
err := p.webService.ServeForever(*pathPrefix)
|
||||||
|
@ -387,6 +392,5 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
p := NewPrometheus()
|
p := NewPrometheus()
|
||||||
registry.MustRegister(p)
|
|
||||||
p.Serve()
|
p.Serve()
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ type Storage interface {
|
||||||
// Run the various maintenance loops in goroutines. Returns when the
|
// Run the various maintenance loops in goroutines. Returns when the
|
||||||
// storage is ready to use. Keeps everything running in the background
|
// storage is ready to use. Keeps everything running in the background
|
||||||
// until Stop is called.
|
// until Stop is called.
|
||||||
Start()
|
Start() error
|
||||||
// Stop shuts down the Storage gracefully, flushes all pending
|
// Stop shuts down the Storage gracefully, flushes all pending
|
||||||
// operations, stops all maintenance loops,and frees all resources.
|
// operations, stops all maintenance loops,and frees all resources.
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
|
@ -268,10 +268,13 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync
|
||||||
p.labelPairToFingerprints = labelPairToFingerprints
|
p.labelPairToFingerprints = labelPairToFingerprints
|
||||||
p.labelNameToLabelValues = labelNameToLabelValues
|
p.labelNameToLabelValues = labelNameToLabelValues
|
||||||
|
|
||||||
go p.processIndexingQueue()
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *persistence) run() {
|
||||||
|
p.processIndexingQueue()
|
||||||
|
}
|
||||||
|
|
||||||
// Describe implements prometheus.Collector.
|
// Describe implements prometheus.Collector.
|
||||||
func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
|
func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
|
||||||
ch <- p.indexingQueueLength.Desc()
|
ch <- p.indexingQueueLength.Desc()
|
||||||
|
|
|
@ -42,6 +42,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes
|
||||||
dir.Close()
|
dir.Close()
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
go p.run()
|
||||||
return p, test.NewCallbackCloser(func() {
|
return p, test.NewCallbackCloser(func() {
|
||||||
p.close()
|
p.close()
|
||||||
dir.Close()
|
dir.Close()
|
||||||
|
|
|
@ -82,6 +82,8 @@ type memorySeriesStorage struct {
|
||||||
fpLocker *fingerprintLocker
|
fpLocker *fingerprintLocker
|
||||||
fpToSeries *seriesMap
|
fpToSeries *seriesMap
|
||||||
|
|
||||||
|
options *MemorySeriesStorageOptions
|
||||||
|
|
||||||
loopStopping, loopStopped chan struct{}
|
loopStopping, loopStopped chan struct{}
|
||||||
maxMemoryChunks int
|
maxMemoryChunks int
|
||||||
dropAfter time.Duration
|
dropAfter time.Duration
|
||||||
|
@ -124,10 +126,12 @@ type MemorySeriesStorageOptions struct {
|
||||||
|
|
||||||
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
|
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
|
||||||
// has to be called to start the storage.
|
// has to be called to start the storage.
|
||||||
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
|
||||||
s := &memorySeriesStorage{
|
s := &memorySeriesStorage{
|
||||||
fpLocker: newFingerprintLocker(1024),
|
fpLocker: newFingerprintLocker(1024),
|
||||||
|
|
||||||
|
options: o,
|
||||||
|
|
||||||
loopStopping: make(chan struct{}),
|
loopStopping: make(chan struct{}),
|
||||||
loopStopped: make(chan struct{}),
|
loopStopped: make(chan struct{}),
|
||||||
maxMemoryChunks: o.MemoryChunks,
|
maxMemoryChunks: o.MemoryChunks,
|
||||||
|
@ -185,9 +189,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
[]string{seriesLocationLabel},
|
[]string{seriesLocationLabel},
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start implements Storage.
|
||||||
|
func (s *memorySeriesStorage) Start() error {
|
||||||
var syncStrategy syncStrategy
|
var syncStrategy syncStrategy
|
||||||
switch o.SyncStrategy {
|
switch s.options.SyncStrategy {
|
||||||
case Never:
|
case Never:
|
||||||
syncStrategy = func() bool { return false }
|
syncStrategy = func() bool { return false }
|
||||||
case Always:
|
case Always:
|
||||||
|
@ -198,33 +206,32 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
panic("unknown sync strategy")
|
panic("unknown sync strategy")
|
||||||
}
|
}
|
||||||
|
|
||||||
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks, syncStrategy)
|
p, err := newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
s.persistence = p
|
s.persistence = p
|
||||||
|
|
||||||
glog.Info("Loading series map and head chunks...")
|
glog.Info("Loading series map and head chunks...")
|
||||||
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
|
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
glog.Infof("%d series loaded.", s.fpToSeries.length())
|
glog.Infof("%d series loaded.", s.fpToSeries.length())
|
||||||
s.numSeries.Set(float64(s.fpToSeries.length()))
|
s.numSeries.Set(float64(s.fpToSeries.length()))
|
||||||
|
|
||||||
mapper, err := newFPMapper(s.fpToSeries, p)
|
mapper, err := newFPMapper(s.fpToSeries, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
s.mapper = mapper
|
s.mapper = mapper
|
||||||
|
|
||||||
return s, nil
|
go s.persistence.run()
|
||||||
}
|
|
||||||
|
|
||||||
// Start implements Storage.
|
|
||||||
func (s *memorySeriesStorage) Start() {
|
|
||||||
go s.handleEvictList()
|
go s.handleEvictList()
|
||||||
go s.loop()
|
go s.loop()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop implements Storage.
|
// Stop implements Storage.
|
||||||
|
|
|
@ -163,9 +163,9 @@ func TestLoop(t *testing.T) {
|
||||||
CheckpointInterval: 250 * time.Millisecond,
|
CheckpointInterval: 250 * time.Millisecond,
|
||||||
SyncStrategy: Adaptive,
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
storage, err := NewMemorySeriesStorage(o)
|
storage := NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err := storage.Start; err != nil {
|
||||||
t.Fatalf("Error creating storage: %s", err)
|
t.Fatalf("Error starting storage: %s", err)
|
||||||
}
|
}
|
||||||
storage.Start()
|
storage.Start()
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
|
@ -731,9 +731,9 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
||||||
CheckpointInterval: time.Second,
|
CheckpointInterval: time.Second,
|
||||||
SyncStrategy: Adaptive,
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
s, err := NewMemorySeriesStorage(o)
|
s := NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err := s.Start(); err != nil {
|
||||||
b.Fatalf("Error creating storage: %s", err)
|
b.Fatalf("Error starting storage: %s", err)
|
||||||
}
|
}
|
||||||
s.Start()
|
s.Start()
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
|
@ -48,14 +48,12 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (*memorySeriesStorage, tes
|
||||||
CheckpointInterval: time.Hour,
|
CheckpointInterval: time.Hour,
|
||||||
SyncStrategy: Adaptive,
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
storage, err := NewMemorySeriesStorage(o)
|
storage := NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err := storage.Start(); err != nil {
|
||||||
directory.Close()
|
directory.Close()
|
||||||
t.Fatalf("Error creating storage: %s", err)
|
t.Fatalf("Error creating storage: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
storage.Start()
|
|
||||||
|
|
||||||
closer := &testStorageCloser{
|
closer := &testStorageCloser{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
directory: directory,
|
directory: directory,
|
||||||
|
|
Loading…
Reference in a new issue