mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 15:44:05 -08:00
Merge pull request #620 from prometheus/beorn7/persistence
Adaptively reduce the wait time for memory series maintenance.
This commit is contained in:
commit
621ac10060
|
@ -32,9 +32,10 @@ const (
|
|||
chunkLen = 1024
|
||||
|
||||
// See waitForNextFP.
|
||||
fpMaxWaitDuration = 10 * time.Second
|
||||
fpMaxSweepTime = 6 * time.Hour
|
||||
fpMaxWaitDuration = 10 * time.Second
|
||||
|
||||
// See waitForNextFP.
|
||||
maxEvictInterval = time.Minute
|
||||
|
||||
// If numChunskToPersist is this percentage of maxChunksToPersist, we
|
||||
|
@ -532,21 +533,31 @@ func (s *memorySeriesStorage) maybeEvict() {
|
|||
// another fingerprint so that we will process all fingerprints in a tenth of
|
||||
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
|
||||
// to drop chunks after 40h, we want to cycle through all fingerprints within
|
||||
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. If
|
||||
// s.loopStopped is closed, it will return false immediately. The estimation is
|
||||
// based on the total number of fingerprints as passed in.
|
||||
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
|
||||
// 4h. The estimation is based on the total number of fingerprints as passed
|
||||
// in. However, the maximum sweep time is capped at fpMaxSweepTime. Also, the
|
||||
// method will never wait for longer than fpMaxWaitDuration.
|
||||
//
|
||||
// The maxWaitDurationFactor can be used to reduce the waiting time if a faster
|
||||
// processing is required (for example because unpersisted chunks pile up too
|
||||
// much).
|
||||
//
|
||||
// Normally, the method returns true once the wait duration has passed. However,
|
||||
// if s.loopStopped is closed, it will return false immediately.
|
||||
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool {
|
||||
d := fpMaxWaitDuration
|
||||
if numberOfFPs != 0 {
|
||||
sweepTime := s.dropAfter / 10
|
||||
if sweepTime > fpMaxSweepTime {
|
||||
sweepTime = fpMaxSweepTime
|
||||
}
|
||||
d = sweepTime / time.Duration(numberOfFPs)
|
||||
if d > fpMaxWaitDuration {
|
||||
d = fpMaxWaitDuration
|
||||
calculatedWait := time.Duration(float64(sweepTime) / float64(numberOfFPs) * maxWaitDurationFactor)
|
||||
if calculatedWait < d {
|
||||
d = calculatedWait
|
||||
}
|
||||
}
|
||||
if d == 0 {
|
||||
return true
|
||||
}
|
||||
t := time.NewTimer(d)
|
||||
select {
|
||||
case <-t.C:
|
||||
|
@ -575,7 +586,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.
|
|||
|
||||
for {
|
||||
// Initial wait, also important if there are no FPs yet.
|
||||
if !s.waitForNextFP(s.fpToSeries.length()) {
|
||||
if !s.waitForNextFP(s.fpToSeries.length(), 1) {
|
||||
return
|
||||
}
|
||||
begin := time.Now()
|
||||
|
@ -587,7 +598,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.
|
|||
case <-s.loopStopping:
|
||||
return
|
||||
}
|
||||
s.waitForNextFP(s.fpToSeries.length())
|
||||
// Reduce the wait time by the backlog score.
|
||||
s.waitForNextFP(s.fpToSeries.length(), s.persistenceBacklogScore())
|
||||
count++
|
||||
}
|
||||
if count > 0 {
|
||||
|
@ -616,11 +628,11 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
|
|||
)
|
||||
if err != nil {
|
||||
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
|
||||
s.waitForNextFP(0)
|
||||
s.waitForNextFP(0, 1)
|
||||
continue
|
||||
}
|
||||
// Initial wait, also important if there are no FPs yet.
|
||||
if !s.waitForNextFP(len(archivedFPs)) {
|
||||
if !s.waitForNextFP(len(archivedFPs), 1) {
|
||||
return
|
||||
}
|
||||
begin := time.Now()
|
||||
|
@ -630,7 +642,8 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
|
|||
case <-s.loopStopping:
|
||||
return
|
||||
}
|
||||
s.waitForNextFP(len(archivedFPs))
|
||||
// Never speed up maintenance of archived FPs.
|
||||
s.waitForNextFP(len(archivedFPs), 1)
|
||||
}
|
||||
if len(archivedFPs) > 0 {
|
||||
glog.Infof(
|
||||
|
@ -945,7 +958,7 @@ func (s *memorySeriesStorage) isDegraded() bool {
|
|||
glog.Warning("Storage has left graceful degradation mode. Things are back to normal.")
|
||||
} else if !s.degraded && nowDegraded {
|
||||
glog.Warningf(
|
||||
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v.",
|
||||
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.",
|
||||
s.getNumChunksToPersist(),
|
||||
s.getNumChunksToPersist()*100/s.maxChunksToPersist,
|
||||
s.maxChunksToPersist,
|
||||
|
@ -955,6 +968,18 @@ func (s *memorySeriesStorage) isDegraded() bool {
|
|||
return s.degraded
|
||||
}
|
||||
|
||||
// persistenceBacklogScore works similar to isDegraded, but returns a score
|
||||
// about how close we are to degradation. This score is 1.0 if no chunks are
|
||||
// waiting for persistence and 0.0 if we are at or above the degradation
|
||||
// threshold.
|
||||
func (s *memorySeriesStorage) persistenceBacklogScore() float64 {
|
||||
score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100)
|
||||
if score < 0 {
|
||||
return 0
|
||||
}
|
||||
return score
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||
s.persistence.Describe(ch)
|
||||
|
|
Loading…
Reference in a new issue