Merge pull request #724 from prometheus/fabxc/storage-startup

Read from indexing queue during crash recovery.
This commit is contained in:
Fabian Reinartz 2015-05-23 16:50:47 +02:00
commit a92134a947
2 changed files with 20 additions and 8 deletions

View file

@ -239,10 +239,14 @@ func (p *prometheus) reloadConfig() bool {
func (p *prometheus) Serve() { func (p *prometheus) Serve() {
// Start all components. // Start all components.
if err := p.storage.Start(); err != nil { if err := p.storage.Start(); err != nil {
log.Error("Error opening memory series storage: ", err) log.Errorln("Error opening memory series storage:", err)
os.Exit(1) os.Exit(1)
} }
defer p.storage.Stop() defer func() {
if err := p.storage.Stop(); err != nil {
log.Errorln("Error stopping storage:", err)
}
}()
// The storage has to be fully initialized before registering Prometheus. // The storage has to be fully initialized before registering Prometheus.
registry.MustRegister(p) registry.MustRegister(p)

View file

@ -194,7 +194,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
} }
// Start implements Storage. // Start implements Storage.
func (s *memorySeriesStorage) Start() error { func (s *memorySeriesStorage) Start() (err error) {
var syncStrategy syncStrategy var syncStrategy syncStrategy
switch s.options.SyncStrategy { switch s.options.SyncStrategy {
case Never: case Never:
@ -207,11 +207,22 @@ func (s *memorySeriesStorage) Start() error {
panic("unknown sync strategy") panic("unknown sync strategy")
} }
p, err := newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy) var p *persistence
p, err = newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy)
if err != nil { if err != nil {
return err return err
} }
s.persistence = p s.persistence = p
// Persistence must start running before loadSeriesMapAndHeads() is called.
go s.persistence.run()
defer func() {
if err != nil {
if e := p.close(); e != nil {
log.Errorln("Error closing persistence:", e)
}
}
}()
log.Info("Loading series map and head chunks...") log.Info("Loading series map and head chunks...")
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads() s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
@ -221,13 +232,10 @@ func (s *memorySeriesStorage) Start() error {
log.Infof("%d series loaded.", s.fpToSeries.length()) log.Infof("%d series loaded.", s.fpToSeries.length())
s.numSeries.Set(float64(s.fpToSeries.length())) s.numSeries.Set(float64(s.fpToSeries.length()))
mapper, err := newFPMapper(s.fpToSeries, p) s.mapper, err = newFPMapper(s.fpToSeries, p)
if err != nil { if err != nil {
return err return err
} }
s.mapper = mapper
go s.persistence.run()
go s.handleEvictList() go s.handleEvictList()
go s.loop() go s.loop()