Merge pull request #2559 from prometheus/beorn7/storage

storage: Replace fpIter by sortedFPs
This commit is contained in:
Björn Rabenstein 2017-04-03 16:56:21 +02:00 committed by GitHub
commit 1c6240fc40
2 changed files with 33 additions and 39 deletions

View file

@ -106,26 +106,18 @@ func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
return ch
}
// fpIter returns a channel that produces all fingerprints in the seriesMap. The
// channel will be closed once all fingerprints have been received. Not
// consuming all fingerprints from the channel will leak a goroutine. The
// semantics of concurrent modification of seriesMap is the similar as the one
// for iterating over a map with a 'range' clause. However, if the next element
// in iteration order is removed after the current element has been received
// from the channel, it will still be produced by the channel.
func (sm *seriesMap) fpIter() <-chan model.Fingerprint {
ch := make(chan model.Fingerprint)
go func() {
sm.mtx.RLock()
for fp := range sm.m {
sm.mtx.RUnlock()
ch <- fp
sm.mtx.RLock()
}
sm.mtx.RUnlock()
close(ch)
}()
return ch
// sortedFPs returns a sorted slice of all the fingerprints in the seriesMap.
func (sm *seriesMap) sortedFPs() model.Fingerprints {
sm.mtx.RLock()
fps := make(model.Fingerprints, 0, len(sm.m))
for fp := range sm.m {
fps = append(fps, fp)
}
sm.mtx.RUnlock()
// Sorting could take some time, so do it outside of the lock.
sort.Sort(fps)
return fps
}
type memorySeries struct {

View file

@ -18,6 +18,7 @@ import (
"container/list"
"errors"
"fmt"
"math/rand"
"runtime"
"sort"
"sync"
@ -419,11 +420,9 @@ func (s *MemorySeriesStorage) Start() (err error) {
log.Info("Loading series map and head chunks...")
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
for fp := range s.fpToSeries.fpIter() {
if series, ok := s.fpToSeries.get(fp); ok {
if !series.headChunkClosed {
s.headChunks.Inc()
}
for _, series := range s.fpToSeries.m {
if !series.headChunkClosed {
s.headChunks.Inc()
}
}
@ -1330,16 +1329,8 @@ func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFact
func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint {
memoryFingerprints := make(chan model.Fingerprint)
go func() {
var fpIter <-chan model.Fingerprint
defer func() {
if fpIter != nil {
for range fpIter {
// Consume the iterator.
}
}
close(memoryFingerprints)
}()
defer close(memoryFingerprints)
firstPass := true
for {
// Initial wait, also important if there are no FPs yet.
@ -1347,9 +1338,15 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
return
}
begin := time.Now()
fpIter = s.fpToSeries.fpIter()
fps := s.fpToSeries.sortedFPs()
if firstPass {
// Start first pass at a random location in the
// key space to cover the whole key space even
// in the case of frequent restarts.
fps = fps[rand.Intn(len(fps)):]
}
count := 0
for fp := range fpIter {
for _, fp := range fps {
select {
case memoryFingerprints <- fp:
case <-s.loopStopping:
@ -1364,11 +1361,16 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
count++
}
if count > 0 {
msg := "full"
if firstPass {
msg = "initial partial"
}
log.Infof(
"Completed maintenance sweep through %d in-memory fingerprints in %v.",
count, time.Since(begin),
"Completed %s maintenance sweep through %d in-memory fingerprints in %v.",
msg, count, time.Since(begin),
)
}
firstPass = false
}
}()