diff --git a/storage/local/series.go b/storage/local/series.go index 261add291..fe30e9f77 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -106,26 +106,18 @@ func (sm *seriesMap) iter() <-chan fingerprintSeriesPair { return ch } -// fpIter returns a channel that produces all fingerprints in the seriesMap. The -// channel will be closed once all fingerprints have been received. Not -// consuming all fingerprints from the channel will leak a goroutine. The -// semantics of concurrent modification of seriesMap is the similar as the one -// for iterating over a map with a 'range' clause. However, if the next element -// in iteration order is removed after the current element has been received -// from the channel, it will still be produced by the channel. -func (sm *seriesMap) fpIter() <-chan model.Fingerprint { - ch := make(chan model.Fingerprint) - go func() { - sm.mtx.RLock() - for fp := range sm.m { - sm.mtx.RUnlock() - ch <- fp - sm.mtx.RLock() - } - sm.mtx.RUnlock() - close(ch) - }() - return ch +// sortedFPs returns a sorted slice of all the fingerprints in the seriesMap. +func (sm *seriesMap) sortedFPs() model.Fingerprints { + sm.mtx.RLock() + fps := make(model.Fingerprints, 0, len(sm.m)) + for fp := range sm.m { + fps = append(fps, fp) + } + sm.mtx.RUnlock() + + // Sorting could take some time, so do it outside of the lock. + sort.Sort(fps) + return fps } type memorySeries struct { diff --git a/storage/local/storage.go b/storage/local/storage.go index b711f2839..9a0e96b6c 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -18,6 +18,7 @@ import ( "container/list" "errors" "fmt" + "math/rand" "runtime" "sort" "sync" @@ -419,11 +420,9 @@ func (s *MemorySeriesStorage) Start() (err error) { log.Info("Loading series map and head chunks...") s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads() - for fp := range s.fpToSeries.fpIter() { - if series, ok := s.fpToSeries.get(fp); ok { - if !series.headChunkClosed { - s.headChunks.Inc() - } + for _, series := range s.fpToSeries.m { + if !series.headChunkClosed { + s.headChunks.Inc() } } @@ -1330,16 +1329,8 @@ func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFact func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint { memoryFingerprints := make(chan model.Fingerprint) go func() { - var fpIter <-chan model.Fingerprint - - defer func() { - if fpIter != nil { - for range fpIter { - // Consume the iterator. - } - } - close(memoryFingerprints) - }() + defer close(memoryFingerprints) + firstPass := true for { // Initial wait, also important if there are no FPs yet. @@ -1347,9 +1338,15 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger return } begin := time.Now() - fpIter = s.fpToSeries.fpIter() + fps := s.fpToSeries.sortedFPs() + if firstPass { + // Start first pass at a random location in the + // key space to cover the whole key space even + // in the case of frequent restarts. + fps = fps[rand.Intn(len(fps)):] + } count := 0 - for fp := range fpIter { + for _, fp := range fps { select { case memoryFingerprints <- fp: case <-s.loopStopping: @@ -1364,11 +1361,16 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger count++ } if count > 0 { + msg := "full" + if firstPass { + msg = "initial partial" + } log.Infof( - "Completed maintenance sweep through %d in-memory fingerprints in %v.", - count, time.Since(begin), + "Completed %s maintenance sweep through %d in-memory fingerprints in %v.", + msg, count, time.Since(begin), ) } + firstPass = false } }()