diff --git a/main.go b/main.go index f39f148ea6..76062ea342 100644 --- a/main.go +++ b/main.go @@ -55,7 +55,6 @@ var ( memoryEvictionInterval = flag.Duration("storage.memory.evictionInterval", 15*time.Minute, "The period at which old data is evicted from memory.") memoryRetentionPeriod = flag.Duration("storage.memory.retentionPeriod", time.Hour, "The period of time to retain in memory during evictions.") - storagePurgeInterval = flag.Duration("storage.purgeInterval", time.Hour, "The period at which old data is deleted completely from storage.") storageRetentionPeriod = flag.Duration("storage.retentionPeriod", 15*24*time.Hour, "The period of time to retain in storage.") checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") @@ -119,7 +118,6 @@ func NewPrometheus() *prometheus { MemoryEvictionInterval: *memoryEvictionInterval, MemoryRetentionPeriod: *memoryRetentionPeriod, PersistenceStoragePath: *metricsStoragePath, - PersistencePurgeInterval: *storagePurgeInterval, PersistenceRetentionPeriod: *storageRetentionPeriod, CheckpointInterval: *checkpointInterval, Dirty: *storageDirty, diff --git a/storage/local/storage.go b/storage/local/storage.go index d1e6302bc6..cd628c9a79 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -50,7 +50,7 @@ type memorySeriesStorage struct { loopStopping, loopStopped chan struct{} evictInterval, evictAfter time.Duration - purgeInterval, purgeAfter time.Duration + purgeAfter time.Duration checkpointInterval time.Duration persistQueue chan persistRequest @@ -74,7 +74,6 @@ type MemorySeriesStorageOptions struct { MemoryEvictionInterval time.Duration // How often to check for memory eviction. MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory. PersistenceStoragePath string // Location of persistence files. - PersistencePurgeInterval time.Duration // How often to check for purging. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. Dirty bool // Force the storage to consider itself dirty on startup. @@ -109,7 +108,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { loopStopped: make(chan struct{}), evictInterval: o.MemoryEvictionInterval, evictAfter: o.MemoryRetentionPeriod, - purgeInterval: o.PersistencePurgeInterval, purgeAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, @@ -432,22 +430,111 @@ func (s *memorySeriesStorage) handlePersistQueue() { close(s.persistStopped) } +// waitForNextFP waits an estimated duration, after which we want to process +// another fingerprint so that we will process all fingerprints in a tenth of +// s.purgeAfter, e.g. if we want to purge after 10d, we want to cycle through +// all fingerprints within 1d. However, this method will always wait for at +// least 10ms and never longer than 1m. If s.loopStopped is closed, it will +// return false immediately. The estimation is based on the total number of +// fingerprints as passed in. +func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { + d := time.Minute + if numberOfFPs != 0 { + d = s.purgeAfter / time.Duration(numberOfFPs*10) + if d < 10*time.Millisecond { + d = 10 * time.Millisecond + } + if d > time.Minute { + d = time.Minute + } + } + t := time.NewTimer(d) + select { + case <-t.C: + return true + case <-s.loopStopping: + return false + } +} + func (s *memorySeriesStorage) loop() { evictTicker := time.NewTicker(s.evictInterval) - purgeTicker := time.NewTicker(s.purgeInterval) checkpointTicker := time.NewTicker(s.checkpointInterval) + defer func() { evictTicker.Stop() - purgeTicker.Stop() checkpointTicker.Stop() glog.Info("Maintenance loop stopped.") close(s.loopStopped) }() + memoryFingerprints := make(chan clientmodel.Fingerprint) + go func() { + var fpIter <-chan clientmodel.Fingerprint + + defer func() { + if fpIter != nil { + for _ = range fpIter { + // Consume the iterator. + } + } + close(memoryFingerprints) + }() + + for { + // Initial wait, also important if there are no FPs yet. + if !s.waitForNextFP(s.fpToSeries.length()) { + return + } + begun := time.Now() + fpIter = s.fpToSeries.fpIter() + for fp := range fpIter { + select { + case memoryFingerprints <- fp: + case <-s.loopStopping: + return + } + s.waitForNextFP(s.fpToSeries.length()) + } + glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begun)) + } + }() + + archivedFingerprints := make(chan clientmodel.Fingerprint) + go func() { + defer close(archivedFingerprints) + + for { + archivedFPs, err := s.persistence.getFingerprintsModifiedBefore( + clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter), + ) + if err != nil { + glog.Error("Failed to lookup archived fingerprint ranges: ", err) + s.waitForNextFP(0) + continue + } + // Initial wait, also important if there are no FPs yet. + if !s.waitForNextFP(len(archivedFPs)) { + return + } + begun := time.Now() + for _, fp := range archivedFPs { + select { + case archivedFingerprints <- fp: + case <-s.loopStopping: + return + } + s.waitForNextFP(len(archivedFPs)) + } + glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begun)) + } + }() + +loop: for { select { case <-s.loopStopping: - return + break loop case <-checkpointTicker.C: s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) case <-evictTicker.C: @@ -459,7 +546,7 @@ func (s *memorySeriesStorage) loop() { select { case <-s.loopStopping: glog.Info("Interrupted evicting chunks.") - return + break loop default: // Keep going. } @@ -488,40 +575,18 @@ func (s *memorySeriesStorage) loop() { duration := time.Since(begin) s.evictDuration.Set(float64(duration) / float64(time.Millisecond)) glog.Infof("Done evicting chunks in %v.", duration) - case <-purgeTicker.C: - glog.Info("Purging old series data...") - ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter) - begin := time.Now() - - for fp := range s.fpToSeries.fpIter() { - select { - case <-s.loopStopping: - glog.Info("Interrupted purging series.") - return - default: - s.purgeSeries(fp, ts) - } - } - - persistedFPs, err := s.persistence.getFingerprintsModifiedBefore(ts) - if err != nil { - glog.Error("Failed to lookup persisted fingerprint ranges: ", err) - break - } - for _, fp := range persistedFPs { - select { - case <-s.loopStopping: - glog.Info("Interrupted purging series.") - return - default: - s.purgeSeries(fp, ts) - } - } - duration := time.Since(begin) - s.purgeDuration.Set(float64(duration) / float64(time.Millisecond)) - glog.Infof("Done purging old series data in %v.", duration) + case fp := <-memoryFingerprints: + s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) + // TODO: Move chunkdesc eviction and archiving here. + case fp := <-archivedFingerprints: + s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) } } + // Wait until both channels are closed. + for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-memoryFingerprints { + } + for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-archivedFingerprints { + } } // purgeSeries purges chunks older than beforeTime from a series. If the series diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 20b48a7738..c3bef565d6 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -44,7 +44,6 @@ func TestLoop(t *testing.T) { o := &MemorySeriesStorageOptions{ MemoryEvictionInterval: 100 * time.Millisecond, MemoryRetentionPeriod: time.Hour, - PersistencePurgeInterval: 150 * time.Millisecond, PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: 250 * time.Millisecond, @@ -492,7 +491,6 @@ func BenchmarkFuzz(b *testing.B) { o := &MemorySeriesStorageOptions{ MemoryEvictionInterval: time.Second, MemoryRetentionPeriod: 10 * time.Minute, - PersistencePurgeInterval: 10 * time.Second, PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: 3 * time.Second, diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index fc61f06816..da0c2f00f3 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -38,7 +38,6 @@ func NewTestStorage(t testing.TB) (Storage, test.Closer) { o := &MemorySeriesStorageOptions{ MemoryEvictionInterval: time.Minute, MemoryRetentionPeriod: time.Hour, - PersistencePurgeInterval: time.Hour, PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Hour,