Reorganize maintenance loop.

Change-Id: Iac10f988ba3e93ffb188f49c30f92e0b6adce5a3
This commit is contained in:
Bjoern Rabenstein 2014-11-10 22:26:07 +01:00
parent a5f56639b8
commit 3f61d304ce
4 changed files with 104 additions and 44 deletions

View file

@ -55,7 +55,6 @@ var (
memoryEvictionInterval = flag.Duration("storage.memory.evictionInterval", 15*time.Minute, "The period at which old data is evicted from memory.")
memoryRetentionPeriod = flag.Duration("storage.memory.retentionPeriod", time.Hour, "The period of time to retain in memory during evictions.")
storagePurgeInterval = flag.Duration("storage.purgeInterval", time.Hour, "The period at which old data is deleted completely from storage.")
storageRetentionPeriod = flag.Duration("storage.retentionPeriod", 15*24*time.Hour, "The period of time to retain in storage.")
checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
@ -119,7 +118,6 @@ func NewPrometheus() *prometheus {
MemoryEvictionInterval: *memoryEvictionInterval,
MemoryRetentionPeriod: *memoryRetentionPeriod,
PersistenceStoragePath: *metricsStoragePath,
PersistencePurgeInterval: *storagePurgeInterval,
PersistenceRetentionPeriod: *storageRetentionPeriod,
CheckpointInterval: *checkpointInterval,
Dirty: *storageDirty,

View file

@ -50,7 +50,7 @@ type memorySeriesStorage struct {
loopStopping, loopStopped chan struct{}
evictInterval, evictAfter time.Duration
purgeInterval, purgeAfter time.Duration
purgeAfter time.Duration
checkpointInterval time.Duration
persistQueue chan persistRequest
@ -74,7 +74,6 @@ type MemorySeriesStorageOptions struct {
MemoryEvictionInterval time.Duration // How often to check for memory eviction.
MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory.
PersistenceStoragePath string // Location of persistence files.
PersistencePurgeInterval time.Duration // How often to check for purging.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
Dirty bool // Force the storage to consider itself dirty on startup.
@ -109,7 +108,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
loopStopped: make(chan struct{}),
evictInterval: o.MemoryEvictionInterval,
evictAfter: o.MemoryRetentionPeriod,
purgeInterval: o.PersistencePurgeInterval,
purgeAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
@ -432,22 +430,111 @@ func (s *memorySeriesStorage) handlePersistQueue() {
close(s.persistStopped)
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.purgeAfter, e.g. if we want to purge after 10d, we want to cycle through
// all fingerprints within 1d. However, this method will always wait for at
// least 10ms and never longer than 1m. If s.loopStopped is closed, it will
// return false immediately. The estimation is based on the total number of
// fingerprints as passed in.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
d := time.Minute
if numberOfFPs != 0 {
d = s.purgeAfter / time.Duration(numberOfFPs*10)
if d < 10*time.Millisecond {
d = 10 * time.Millisecond
}
if d > time.Minute {
d = time.Minute
}
}
t := time.NewTimer(d)
select {
case <-t.C:
return true
case <-s.loopStopping:
return false
}
}
func (s *memorySeriesStorage) loop() {
evictTicker := time.NewTicker(s.evictInterval)
purgeTicker := time.NewTicker(s.purgeInterval)
checkpointTicker := time.NewTicker(s.checkpointInterval)
defer func() {
evictTicker.Stop()
purgeTicker.Stop()
checkpointTicker.Stop()
glog.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
memoryFingerprints := make(chan clientmodel.Fingerprint)
go func() {
var fpIter <-chan clientmodel.Fingerprint
defer func() {
if fpIter != nil {
for _ = range fpIter {
// Consume the iterator.
}
}
close(memoryFingerprints)
}()
for {
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(s.fpToSeries.length()) {
return
}
begun := time.Now()
fpIter = s.fpToSeries.fpIter()
for fp := range fpIter {
select {
case memoryFingerprints <- fp:
case <-s.loopStopping:
return
}
s.waitForNextFP(s.fpToSeries.length())
}
glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begun))
}
}()
archivedFingerprints := make(chan clientmodel.Fingerprint)
go func() {
defer close(archivedFingerprints)
for {
archivedFPs, err := s.persistence.getFingerprintsModifiedBefore(
clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter),
)
if err != nil {
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
s.waitForNextFP(0)
continue
}
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(len(archivedFPs)) {
return
}
begun := time.Now()
for _, fp := range archivedFPs {
select {
case archivedFingerprints <- fp:
case <-s.loopStopping:
return
}
s.waitForNextFP(len(archivedFPs))
}
glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begun))
}
}()
loop:
for {
select {
case <-s.loopStopping:
return
break loop
case <-checkpointTicker.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
case <-evictTicker.C:
@ -459,7 +546,7 @@ func (s *memorySeriesStorage) loop() {
select {
case <-s.loopStopping:
glog.Info("Interrupted evicting chunks.")
return
break loop
default:
// Keep going.
}
@ -488,40 +575,18 @@ func (s *memorySeriesStorage) loop() {
duration := time.Since(begin)
s.evictDuration.Set(float64(duration) / float64(time.Millisecond))
glog.Infof("Done evicting chunks in %v.", duration)
case <-purgeTicker.C:
glog.Info("Purging old series data...")
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter)
begin := time.Now()
for fp := range s.fpToSeries.fpIter() {
select {
case <-s.loopStopping:
glog.Info("Interrupted purging series.")
return
default:
s.purgeSeries(fp, ts)
}
}
persistedFPs, err := s.persistence.getFingerprintsModifiedBefore(ts)
if err != nil {
glog.Error("Failed to lookup persisted fingerprint ranges: ", err)
break
}
for _, fp := range persistedFPs {
select {
case <-s.loopStopping:
glog.Info("Interrupted purging series.")
return
default:
s.purgeSeries(fp, ts)
}
}
duration := time.Since(begin)
s.purgeDuration.Set(float64(duration) / float64(time.Millisecond))
glog.Infof("Done purging old series data in %v.", duration)
case fp := <-memoryFingerprints:
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
// TODO: Move chunkdesc eviction and archiving here.
case fp := <-archivedFingerprints:
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
}
}
// Wait until both channels are closed.
for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-memoryFingerprints {
}
for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-archivedFingerprints {
}
}
// purgeSeries purges chunks older than beforeTime from a series. If the series

View file

@ -44,7 +44,6 @@ func TestLoop(t *testing.T) {
o := &MemorySeriesStorageOptions{
MemoryEvictionInterval: 100 * time.Millisecond,
MemoryRetentionPeriod: time.Hour,
PersistencePurgeInterval: 150 * time.Millisecond,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(),
CheckpointInterval: 250 * time.Millisecond,
@ -492,7 +491,6 @@ func BenchmarkFuzz(b *testing.B) {
o := &MemorySeriesStorageOptions{
MemoryEvictionInterval: time.Second,
MemoryRetentionPeriod: 10 * time.Minute,
PersistencePurgeInterval: 10 * time.Second,
PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(),
CheckpointInterval: 3 * time.Second,

View file

@ -38,7 +38,6 @@ func NewTestStorage(t testing.TB) (Storage, test.Closer) {
o := &MemorySeriesStorageOptions{
MemoryEvictionInterval: time.Minute,
MemoryRetentionPeriod: time.Hour,
PersistencePurgeInterval: time.Hour,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Hour,