Tweak timing in the maintenance loop.

Change-Id: I9801c4f9a22c3b3dc1ce1af81fdd9e992a4f4dd7
This commit is contained in:
Bjoern Rabenstein 2014-11-13 16:55:15 +01:00
parent 2672aa8ece
commit d73e851b14

View file

@ -29,6 +29,11 @@ import (
const ( const (
persistQueueCap = 1024 persistQueueCap = 1024
chunkLen = 1024 chunkLen = 1024
// See waitForNextFP.
fpMaxWaitDuration = 10 * time.Second
fpMinWaitDuration = 5 * time.Millisecond // ~ hard disk seek time.
fpMaxSweepTime = 6 * time.Hour
) )
type storageState uint type storageState uint
@ -434,20 +439,26 @@ func (s *memorySeriesStorage) handlePersistQueue() {
// waitForNextFP waits an estimated duration, after which we want to process // waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of // another fingerprint so that we will process all fingerprints in a tenth of
// s.purgeAfter, e.g. if we want to purge after 10d, we want to cycle through // s.purgeAfter assuming that the system is doing nothing else, e.g. if we want
// all fingerprints within 1d. However, this method will always wait for at // to purge after 40h, we want to cycle through all fingerprints within
// least 10ms and never longer than 1m. If s.loopStopped is closed, it will // 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
// return false immediately. The estimation is based on the total number of // this method will always wait for at least fpMinWaitDuration and never longer
// fingerprints as passed in. // than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
// immediately. The estimation is based on the total number of fingerprints as
// passed in.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
d := time.Minute d := fpMaxWaitDuration
if numberOfFPs != 0 { if numberOfFPs != 0 {
d = s.purgeAfter / time.Duration(numberOfFPs*10) sweepTime := s.purgeAfter / 10
if d < 10*time.Millisecond { if sweepTime > fpMaxSweepTime {
d = 10 * time.Millisecond sweepTime = fpMaxSweepTime
} }
if d > time.Minute { d = sweepTime / time.Duration(numberOfFPs)
d = time.Minute if d < fpMinWaitDuration {
d = fpMinWaitDuration
}
if d > fpMaxWaitDuration {
d = fpMaxWaitDuration
} }
} }
t := time.NewTimer(d) t := time.NewTimer(d)
@ -579,7 +590,7 @@ loop:
glog.Infof("Done evicting chunks in %v.", duration) glog.Infof("Done evicting chunks in %v.", duration)
case fp := <-memoryFingerprints: case fp := <-memoryFingerprints:
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
// TODO: Move chunkdesc eviction and archiving here. // TODO: Move chunkdesc eviction, head chunk closing, and archiving here.
s.seriesOps.WithLabelValues(memoryMaintenance).Inc() s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
case fp := <-archivedFingerprints: case fp := <-archivedFingerprints:
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))