From 6e319532cf47030d80ff7908545c570cd08f9f17 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 23 May 2015 12:03:14 +0200 Subject: [PATCH] Read from indexing queue during crash recovery. Change #704 introduced a regression that started reading the queue only after potential crash recovery. When more than the queue capacity was indexed, Prometheus deadlocked. --- main.go | 8 ++++++-- storage/local/storage.go | 20 ++++++++++++++------ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index 03b79e964..a146cd91e 100644 --- a/main.go +++ b/main.go @@ -239,10 +239,14 @@ func (p *prometheus) reloadConfig() bool { func (p *prometheus) Serve() { // Start all components. if err := p.storage.Start(); err != nil { - log.Error("Error opening memory series storage: ", err) + log.Errorln("Error opening memory series storage:", err) os.Exit(1) } - defer p.storage.Stop() + defer func() { + if err := p.storage.Stop(); err != nil { + log.Errorln("Error stopping storage:", err) + } + }() // The storage has to be fully initialized before registering Prometheus. registry.MustRegister(p) diff --git a/storage/local/storage.go b/storage/local/storage.go index c12a06536..699076006 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -194,7 +194,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { } // Start implements Storage. -func (s *memorySeriesStorage) Start() error { +func (s *memorySeriesStorage) Start() (err error) { var syncStrategy syncStrategy switch s.options.SyncStrategy { case Never: @@ -207,11 +207,22 @@ func (s *memorySeriesStorage) Start() error { panic("unknown sync strategy") } - p, err := newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy) + var p *persistence + p, err = newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy) if err != nil { return err } s.persistence = p + // Persistence must start running before loadSeriesMapAndHeads() is called. + go s.persistence.run() + + defer func() { + if err != nil { + if e := p.close(); e != nil { + log.Errorln("Error closing persistence:", e) + } + } + }() log.Info("Loading series map and head chunks...") s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads() @@ -221,13 +232,10 @@ func (s *memorySeriesStorage) Start() error { log.Infof("%d series loaded.", s.fpToSeries.length()) s.numSeries.Set(float64(s.fpToSeries.length())) - mapper, err := newFPMapper(s.fpToSeries, p) + s.mapper, err = newFPMapper(s.fpToSeries, p) if err != nil { return err } - s.mapper = mapper - - go s.persistence.run() go s.handleEvictList() go s.loop()