From bb42cc2e2dcabbdd7360aa64ba36bfaeb71f494f Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Thu, 13 Nov 2014 20:50:25 +0100 Subject: [PATCH] Evict based on memory pressure. Evict recently used chunks last. Change-Id: Ie6168f0cdb3917bdc63b6fe15585dd70c1e42afe --- main.go | 11 +- storage/local/chunk.go | 42 +++--- storage/local/preload.go | 2 +- storage/local/series.go | 127 ++++------------- storage/local/storage.go | 247 +++++++++++++++++++++++----------- storage/local/storage_test.go | 56 +++----- storage/local/test_helpers.go | 3 +- 7 files changed, 246 insertions(+), 242 deletions(-) diff --git a/main.go b/main.go index e8024cb43..183e770bb 100644 --- a/main.go +++ b/main.go @@ -42,18 +42,18 @@ const deletionBatchSize = 100 // Commandline flags. var ( - configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") - metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.") + configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.") + metricsStoragePath = flag.String("storage.path", "/tmp/metrics", "Base path for metrics storage.") + remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") samplesQueueCapacity = flag.Int("storage.queue.samplesCapacity", 4096, "The size of the unwritten samples queue.") - memoryEvictionInterval = flag.Duration("storage.memory.evictionInterval", 15*time.Minute, "The period at which old data is evicted from memory.") - memoryRetentionPeriod = flag.Duration("storage.memory.retentionPeriod", time.Hour, "The period of time to retain in memory during evictions.") + numMemoryChunks = flag.Int("storage.memoryChunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") storageRetentionPeriod = flag.Duration("storage.retentionPeriod", 15*24*time.Hour, "The period of time to retain in storage.") @@ -115,8 +115,7 @@ func NewPrometheus() *prometheus { notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity) o := &local.MemorySeriesStorageOptions{ - MemoryEvictionInterval: *memoryEvictionInterval, - MemoryRetentionPeriod: *memoryRetentionPeriod, + MemoryChunks: *numMemoryChunks, PersistenceStoragePath: *metricsStoragePath, PersistenceRetentionPeriod: *storageRetentionPeriod, CheckpointInterval: *checkpointInterval, diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 4e811bf20..0efbe0f91 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -14,6 +14,7 @@ package local import ( + "container/list" "io" "sync" "sync/atomic" @@ -27,9 +28,13 @@ type chunkDesc struct { sync.Mutex chunk chunk refCount int - evict bool chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted. chunkLastTime clientmodel.Timestamp // Used if chunk is evicted. + + // evictListElement is nil if the chunk is not in the evict list. + // evictListElement is _not_ protected by the chunkDesc mutex. + // It must only be touched by the evict list handler in memorySeriesStorage. + evictListElement *list.Element } // newChunkDesc creates a new chunkDesc pointing to the given chunk. The @@ -48,14 +53,18 @@ func (cd *chunkDesc) add(s *metric.SamplePair) []chunk { return cd.chunk.add(s) } -func (cd *chunkDesc) pin() { +func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() + if cd.refCount == 0 { + // Remove ourselves from the evict list. + evictRequests <- evictRequest{cd, false} + } cd.refCount++ } -func (cd *chunkDesc) unpin() { +func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -63,8 +72,9 @@ func (cd *chunkDesc) unpin() { panic("cannot unpin already unpinned chunk") } cd.refCount-- - if cd.refCount == 0 && cd.evict { - cd.evictNow() + if cd.refCount == 0 { + // Add ourselves to the back of the evict list. + evictRequests <- evictRequest{cd, true} } } @@ -115,36 +125,28 @@ func (cd *chunkDesc) setChunk(c chunk) { if cd.chunk != nil { panic("chunk already set") } - cd.evict = false cd.chunk = c } -// evictOnUnpin evicts the chunk once unpinned. If it is not pinned when this -// method is called, it evicts the chunk immediately and returns true. If the -// chunk is already evicted when this method is called, it returns true, too. -func (cd *chunkDesc) evictOnUnpin() bool { +// maybeEvict evicts the chunk if the refCount is 0. It returns wether the chunk +// is now evicted, which includes the case that the chunk was evicted even +// before this method was called. +func (cd *chunkDesc) maybeEvict() bool { cd.Lock() defer cd.Unlock() if cd.chunk == nil { - // Already evicted. return true } - cd.evict = true - if cd.refCount == 0 { - cd.evictNow() - return true + if cd.refCount != 0 { + return false } - return false -} - -// evictNow is an internal helper method. -func (cd *chunkDesc) evictNow() { cd.chunkFirstTime = cd.chunk.firstTime() cd.chunkLastTime = cd.chunk.lastTime() cd.chunk = nil chunkOps.WithLabelValues(evict).Inc() atomic.AddInt64(&numMemChunks, -1) + return true } // chunk is the interface for all chunks. Chunks are generally not diff --git a/storage/local/preload.go b/storage/local/preload.go index c5669e6b0..1e1cd6c96 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -104,7 +104,7 @@ func (p *memorySeriesPreloader) Close() { // over the pinnedChunkDescs to a manager that will delay the unpinning // based on time and memory pressure. for _, cd := range p.pinnedChunkDescs { - cd.unpin() + cd.unpin(p.storage.evictRequests) } chunkOps.WithLabelValues(unpin).Add(float64(len(p.pinnedChunkDescs))) diff --git a/storage/local/series.go b/storage/local/series.go index 886914b9e..8910cbd6f 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -225,90 +225,26 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []* return chunkDescsToPersist } -// evictOlderThan marks for eviction all chunks whose latest sample is older -// than the given timestamp. It returns allEvicted as true if all chunks in the -// series were immediately evicted (i.e. all chunks are older than the -// timestamp, and none of the chunks was pinned). -// -// The method also evicts chunkDescs if there are chunkDescEvictionFactor times -// more chunkDescs in the series than unevicted chunks. (The number of unevicted -// chunks is considered the number of chunks between (and including) the oldest -// chunk that could not be evicted immediately and the newest chunk in the -// series, even if chunks in between were evicted.) -// -// Special considerations for the head chunk: If it has not been scheduled to be -// persisted yet but is old enough for eviction, this method returns a pointer -// to the descriptor of the head chunk to be persisted. (Otherwise, the method -// returns nil.) The caller is then responsible for persisting the head -// chunk. The internal state of this memorySeries is already set accordingly by -// this method. Calling evictOlderThan for a series with a non-persisted head -// chunk that is old enough for eviction will never evict all chunks -// immediately, even if no chunk is pinned for other reasons, because the head -// chunk is not persisted yet. A series old enough for archiving will require at -// least two eviction runs to become ready for archiving: In the first run, its -// head chunk is requested to be persisted. The next call of evictOlderThan will -// then return true, provided that the series hasn't received new samples in the -// meantime, the head chunk has now been persisted, and no chunk is pinned for -// other reasons. -// -// The caller must have locked the fingerprint of the series. -func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, headChunkToPersist *chunkDesc) { - allEvicted = true - iOldestNotEvicted := -1 - - defer func() { - // Evict chunkDescs if there are chunkDescEvictionFactor times - // more than non-evicted chunks and we are not going to archive - // the whole series anyway. - if iOldestNotEvicted != -1 { - lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) - if lenToKeep < len(s.chunkDescs) { - s.savedFirstTime = s.firstTime() - lenEvicted := len(s.chunkDescs) - lenToKeep - s.chunkDescsOffset += lenEvicted - chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) - atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted)) - s.chunkDescs = append( - make([]*chunkDesc, 0, lenToKeep), - s.chunkDescs[lenEvicted:]..., - ) - } - } - }() - - // For now, always drop the entire range from oldest to t. - for i, cd := range s.chunkDescs { - if !cd.lastTime().Before(t) { - if iOldestNotEvicted == -1 { - iOldestNotEvicted = i - } - return false, nil - } - if cd.isEvicted() { - continue - } - if !s.headChunkPersisted && i == len(s.chunkDescs)-1 { - // This is a non-persisted head chunk that is old enough - // for eviction. Request it to be persisted: - headChunkToPersist = cd - s.headChunkPersisted = true - // Since we cannot modify the head chunk from now on, we - // don't need to bother with cloning anymore. - s.headChunkUsedByIterator = false - } - if !cd.evictOnUnpin() { - if iOldestNotEvicted == -1 { - iOldestNotEvicted = i - } - allEvicted = false - } +// evictChunkDescs evicts chunkDescs if there are chunkDescEvictionFactor times +// more than non-evicted chunks. iOldestNotEvicted is the index within the +// current chunkDescs of the oldest chunk that is not evicted. +func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { + lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) + if lenToKeep < len(s.chunkDescs) { + s.savedFirstTime = s.firstTime() + lenEvicted := len(s.chunkDescs) - lenToKeep + s.chunkDescsOffset += lenEvicted + chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) + atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted)) + s.chunkDescs = append( + make([]*chunkDesc, 0, lenToKeep), + s.chunkDescs[lenEvicted:]..., + ) } - return allEvicted, headChunkToPersist } -// purgeOlderThan removes chunkDescs older than t. It also evicts the chunks of -// those chunkDescs (although that's probably not even necessary). It returns -// the number of purged chunkDescs and true if all chunkDescs have been purged. +// purgeOlderThan removes chunkDescs older than t. It returns the number of +// purged chunkDescs and true if all chunkDescs have been purged. // // The caller must have locked the fingerprint of the series. func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { @@ -318,7 +254,6 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { keepIdx = i break } - s.chunkDescs[i].evictOnUnpin() } if keepIdx > 0 { s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) @@ -328,13 +263,13 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { } // preloadChunks is an internal helper method. -func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDesc, error) { +func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([]*chunkDesc, error) { loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) for _, idx := range indexes { cd := s.chunkDescs[idx] pinnedChunkDescs = append(pinnedChunkDescs, cd) - cd.pin() // Have to pin everything first to prevent concurrent evictOnUnpin later. + cd.pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading. if cd.isEvicted() { loadIndexes = append(loadIndexes, idx) } @@ -346,11 +281,12 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") } fp := s.metric.Fingerprint() - chunks, err := p.loadChunks(fp, loadIndexes, s.chunkDescsOffset) + // TODO: Remove law-of-Demeter violation? + chunks, err := mss.persistence.loadChunks(fp, loadIndexes, s.chunkDescsOffset) if err != nil { // Unpin the chunks since we won't return them as pinned chunks now. for _, cd := range pinnedChunkDescs { - cd.unpin() + cd.unpin(mss.evictRequests) } chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) return nil, err @@ -400,14 +336,15 @@ func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p *persisten // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( from clientmodel.Timestamp, through clientmodel.Timestamp, - fp clientmodel.Fingerprint, p *persistence, + fp clientmodel.Fingerprint, mss *memorySeriesStorage, ) ([]*chunkDesc, error) { firstChunkDescTime := clientmodel.Timestamp(math.MaxInt64) if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() } if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { - cds, err := p.loadChunkDescs(fp, firstChunkDescTime) + // TODO: Remove law-of-demeter violation? + cds, err := mss.persistence.loadChunkDescs(fp, firstChunkDescTime) if err != nil { return nil, err } @@ -438,7 +375,7 @@ func (s *memorySeries) preloadChunksForRange( for i := fromIdx; i <= throughIdx; i++ { pinIndexes = append(pinIndexes, i) } - return s.preloadChunks(pinIndexes, p) + return s.preloadChunks(pinIndexes, mss) } // newIterator returns a new SeriesIterator. The caller must have locked the @@ -467,18 +404,6 @@ func (s *memorySeries) head() *chunkDesc { return s.chunkDescs[len(s.chunkDescs)-1] } -// values returns all values in the series. The caller must have locked the -// fingerprint of the memorySeries. -func (s *memorySeries) values() metric.Values { - var values metric.Values - for _, cd := range s.chunkDescs { - for sample := range cd.chunk.values() { - values = append(values, *sample) - } - } - return values -} - // firstTime returns the timestamp of the first sample in the series. The caller // must have locked the fingerprint of the memorySeries. func (s *memorySeries) firstTime() clientmodel.Timestamp { diff --git a/storage/local/storage.go b/storage/local/storage.go index a1fc88b3e..5b86bceca 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -15,6 +15,7 @@ package local import ( + "container/list" "sync/atomic" "time" @@ -27,13 +28,17 @@ import ( ) const ( - persistQueueCap = 1024 - chunkLen = 1024 + persistQueueCap = 1024 + evictRequestsCap = 1024 + chunkLen = 1024 // See waitForNextFP. fpMaxWaitDuration = 10 * time.Second fpMinWaitDuration = 5 * time.Millisecond // ~ hard disk seek time. fpMaxSweepTime = 6 * time.Hour + + maxEvictInterval = time.Minute + headChunkTimeout = time.Hour // Close head chunk if not touched for that long. ) type storageState uint @@ -49,12 +54,17 @@ type persistRequest struct { chunkDesc *chunkDesc } +type evictRequest struct { + cd *chunkDesc + evict bool +} + type memorySeriesStorage struct { fpLocker *fingerprintLocker fpToSeries *seriesMap loopStopping, loopStopped chan struct{} - evictInterval, evictAfter time.Duration + maxMemoryChunks int purgeAfter time.Duration checkpointInterval time.Duration @@ -62,22 +72,25 @@ type memorySeriesStorage struct { persistStopped chan struct{} persistence *persistence - persistLatency prometheus.Summary - persistErrors *prometheus.CounterVec - persistQueueLength prometheus.Gauge - numSeries prometheus.Gauge - seriesOps *prometheus.CounterVec - ingestedSamplesCount prometheus.Counter - invalidPreloadRequestsCount prometheus.Counter - purgeDuration, evictDuration prometheus.Gauge + evictList *list.List + evictRequests chan evictRequest + evictStopping, evictStopped chan struct{} + + persistLatency prometheus.Summary + persistErrors *prometheus.CounterVec + persistQueueLength prometheus.Gauge + numSeries prometheus.Gauge + seriesOps *prometheus.CounterVec + ingestedSamplesCount prometheus.Counter + invalidPreloadRequestsCount prometheus.Counter + purgeDuration prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by // NewMemorySeriesStorage. It is not safe to leave any of those at their zero // values. type MemorySeriesStorageOptions struct { - MemoryEvictionInterval time.Duration // How often to check for memory eviction. - MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory. + MemoryChunks int // How many chunks to keep in memory. PersistenceStoragePath string // Location of persistence files. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. @@ -111,8 +124,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), - evictInterval: o.MemoryEvictionInterval, - evictAfter: o.MemoryRetentionPeriod, + maxMemoryChunks: o.MemoryChunks, purgeAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, @@ -120,6 +132,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { persistStopped: make(chan struct{}), persistence: p, + evictList: list.New(), + evictRequests: make(chan evictRequest, evictRequestsCap), + evictStopping: make(chan struct{}), + evictStopped: make(chan struct{}), + persistLatency: prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, @@ -163,23 +180,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { Name: "invalid_preload_requests_total", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", }), - purgeDuration: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "purge_duration_milliseconds", - Help: "The duration of the last storage purge iteration in milliseconds.", - }), - evictDuration: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "evict_duration_milliseconds", - Help: "The duration of the last memory eviction iteration in milliseconds.", - }), }, nil } // Start implements Storage. func (s *memorySeriesStorage) Start() { + go s.handleEvictList() go s.handlePersistQueue() go s.loop() } @@ -187,14 +193,19 @@ func (s *memorySeriesStorage) Start() { // Stop implements Storage. func (s *memorySeriesStorage) Stop() error { glog.Info("Stopping local storage...") + glog.Info("Stopping maintenance loop...") close(s.loopStopping) <-s.loopStopped - glog.Info("Stopping persist loop...") + glog.Info("Stopping persist queue...") close(s.persistQueue) <-s.persistStopped + glog.Info("Stopping chunk eviction...") + close(s.evictStopping) + <-s.evictStopped + // One final checkpoint of the series map and the head chunks. if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { return err @@ -407,7 +418,83 @@ func (s *memorySeriesStorage) preloadChunksForRange( return nil, nil } } - return series.preloadChunksForRange(from, through, fp, s.persistence) + return series.preloadChunksForRange(from, through, fp, s) +} + +func (s *memorySeriesStorage) handleEvictList() { + ticker := time.NewTicker(maxEvictInterval) + count := 0 +loop: + for { + // To batch up evictions a bit, this tries evictions at least + // once per evict interval, but earlier if the number of evict + // requests with evict==true that has happened since the last + // evict run is more than maxMemoryChunks/1000. + select { + case req := <-s.evictRequests: + if req.evict { + req.cd.evictListElement = s.evictList.PushBack(req.cd) + count++ + if count > s.maxMemoryChunks/1000 { + s.maybeEvict() + count = 0 + } + } else { + if req.cd.evictListElement != nil { + s.evictList.Remove(req.cd.evictListElement) + req.cd.evictListElement = nil + } + } + case <-ticker.C: + if s.evictList.Len() > 0 { + s.maybeEvict() + } + case <-s.evictStopping: + break loop + } + } + ticker.Stop() + glog.Info("Chunk eviction stopped.") + close(s.evictStopped) +} + +// maybeEvict is a local helper method. Must only be called by handleEvictList. +func (s *memorySeriesStorage) maybeEvict() { + numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks + if numChunksToEvict <= 0 { + return + } + chunkDescsToEvict := make([]*chunkDesc, numChunksToEvict) + for i := range chunkDescsToEvict { + e := s.evictList.Front() + if e == nil { + break + } + cd := e.Value.(*chunkDesc) + cd.evictListElement = nil + chunkDescsToEvict[i] = cd + s.evictList.Remove(e) + } + // Do the actual eviction in a goroutine as we might otherwise deadlock, + // in the following way: A chunk was unpinned completely and therefore + // scheduled for eviction. At the time we actually try to evict it, + // another goroutine is pinning the chunk. The pinning goroutine has + // currently locked the chunk and tries to send the evict request (to + // remove the chunk from the evict list) to the evictRequests + // channel. The send blocks because evictRequests is full. However, the + // goroutine that is supposed to empty the channel is wating for the + // chunkDesc lock to try to evict the chunk. + go func() { + for _, cd := range chunkDescsToEvict { + if cd == nil { + break + } + cd.maybeEvict() + // We don't care if the eviction succeeds. If the chunk + // was pinned in the meantime, it will be added to the + // evict list once it gets unpinned again. + } + }() } func (s *memorySeriesStorage) handlePersistQueue() { @@ -430,10 +517,10 @@ func (s *memorySeriesStorage) handlePersistQueue() { s.persistence.setDirty(true) continue } - req.chunkDesc.unpin() + req.chunkDesc.unpin(s.evictRequests) chunkOps.WithLabelValues(persistAndUnpin).Inc() } - glog.Info("Persist loop stopped.") + glog.Info("Persist queue drained and stopped.") close(s.persistStopped) } @@ -471,11 +558,9 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { } func (s *memorySeriesStorage) loop() { - evictTicker := time.NewTicker(s.evictInterval) checkpointTicker := time.NewTicker(s.checkpointInterval) defer func() { - evictTicker.Stop() checkpointTicker.Stop() glog.Info("Maintenance loop stopped.") close(s.loopStopped) @@ -550,47 +635,9 @@ loop: break loop case <-checkpointTicker.C: s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) - case <-evictTicker.C: - // TODO: Change this to be based on number of chunks in memory. - glog.Info("Evicting chunks...") - begin := time.Now() - - for m := range s.fpToSeries.iter() { - select { - case <-s.loopStopping: - glog.Info("Interrupted evicting chunks.") - break loop - default: - // Keep going. - } - s.fpLocker.Lock(m.fp) - allEvicted, headChunkToPersist := m.series.evictOlderThan( - clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.evictAfter), - ) - if allEvicted { - s.fpToSeries.del(m.fp) - s.numSeries.Dec() - if err := s.persistence.archiveMetric( - m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), - ); err != nil { - glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) - } else { - s.seriesOps.WithLabelValues(archive).Inc() - } - } - s.fpLocker.Unlock(m.fp) - // Queue outside of lock! - if headChunkToPersist != nil { - s.persistQueue <- persistRequest{m.fp, headChunkToPersist} - } - } - - duration := time.Since(begin) - s.evictDuration.Set(float64(duration) / float64(time.Millisecond)) - glog.Infof("Done evicting chunks in %v.", duration) case fp := <-memoryFingerprints: s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) - // TODO: Move chunkdesc eviction, head chunk closing, and archiving here. + s.maintainSeries(fp) s.seriesOps.WithLabelValues(memoryMaintenance).Inc() case fp := <-archivedFingerprints: s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) @@ -604,6 +651,56 @@ loop: } } +// maintainSeries closes the head chunk if not touched in a while. It archives a +// series if all chunks are evicted. It evicts chunkDescs if there are too many. +func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { + var headChunkToPersist *chunkDesc + s.fpLocker.Lock(fp) + defer func() { + s.fpLocker.Unlock(fp) + // Queue outside of lock! + if headChunkToPersist != nil { + s.persistQueue <- persistRequest{fp, headChunkToPersist} + } + }() + + series, ok := s.fpToSeries.get(fp) + if !ok { + return + } + iOldestNotEvicted := -1 + for i, cd := range series.chunkDescs { + if !cd.isEvicted() { + iOldestNotEvicted = i + break + } + } + + // Archive if all chunks are evicted. + if iOldestNotEvicted == -1 { + s.fpToSeries.del(fp) + s.numSeries.Dec() + if err := s.persistence.archiveMetric( + fp, series.metric, series.firstTime(), series.lastTime(), + ); err != nil { + glog.Errorf("Error archiving metric %v: %v", series.metric, err) + } else { + s.seriesOps.WithLabelValues(archive).Inc() + } + return + } + // If we are here, the series is not archived, so check for chunkDesc + // eviction next and then if the head chunk needs to be persisted. + series.evictChunkDescs(iOldestNotEvicted) + if !series.headChunkPersisted && time.Now().Sub(series.head().firstTime().Time()) > headChunkTimeout { + series.headChunkPersisted = true + // Since we cannot modify the head chunk from now on, we + // don't need to bother with cloning anymore. + series.headChunkUsedByIterator = false + headChunkToPersist = series.head() + } +} + // purgeSeries purges chunks older than beforeTime from a series. If the series // contains no chunks after the purge, it is dropped entirely. func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { @@ -635,7 +732,6 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime } return } - // Deal with archived series. has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) if err != nil { @@ -648,6 +744,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime } newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime) + glog.Infoln("DEBUG:", newFirstTime, allDropped) if err != nil { glog.Error("Error purging persisted chunks: ", err) } @@ -685,8 +782,6 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc() - ch <- s.purgeDuration.Desc() - ch <- s.evictDuration.Desc() ch <- persistQueueCapDesc @@ -705,8 +800,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount ch <- s.invalidPreloadRequestsCount - ch <- s.purgeDuration - ch <- s.evictDuration ch <- persistQueueCapGauge diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 2f76c1482..182a1f5e0 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -20,6 +20,7 @@ import ( "testing/quick" "time" + "github.com/golang/glog" clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility/test" @@ -42,8 +43,7 @@ func TestLoop(t *testing.T) { directory := test.NewTemporaryDirectory("test_storage", t) defer directory.Close() o := &MemorySeriesStorageOptions{ - MemoryEvictionInterval: 100 * time.Millisecond, - MemoryRetentionPeriod: time.Hour, + MemoryChunks: 50, PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: 250 * time.Millisecond, @@ -73,16 +73,28 @@ func TestChunk(t *testing.T) { for m := range s.(*memorySeriesStorage).fpToSeries.iter() { s.(*memorySeriesStorage).fpLocker.Lock(m.fp) - for i, v := range m.series.values() { + + var values metric.Values + for _, cd := range m.series.chunkDescs { + if cd.isEvicted() { + continue + } + for sample := range cd.chunk.values() { + values = append(values, *sample) + } + } + + for i, v := range values { if samples[i].Timestamp != v.Timestamp { - t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp) + t.Errorf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp) } if samples[i].Value != v.Value { - t.Fatalf("%d. Got %v; want %v", i, v.Value, samples[i].Value) + t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value) } } s.(*memorySeriesStorage).fpLocker.Unlock(m.fp) } + glog.Info("test done, closing") } func TestGetValueAtTime(t *testing.T) { @@ -362,37 +374,12 @@ func TestEvictAndPurgeSeries(t *testing.T) { t.Fatal("could not find series") } - // Evict everything except head chunk. - allEvicted, headChunkToPersist := series.evictOlderThan(1998) - // Head chunk not yet old enough, should get false, false: - if allEvicted { - t.Error("allEvicted with head chunk not yet old enough") - } - if headChunkToPersist != nil { - t.Error("persistHeadChunk is not nil although head chunk is not old enough") - } - // Evict everything. - allEvicted, headChunkToPersist = series.evictOlderThan(10000) - // Since the head chunk is not yet persisted, we should get false, true: - if allEvicted { - t.Error("allEvicted with head chuk not yet persisted") - } - if headChunkToPersist == nil { - t.Error("headChunkToPersist is nil although head chunk is old enough") - } - // Persist head chunk as requested. + // Persist head chunk so we can safely archive. + series.headChunkPersisted = true ms.persistQueue <- persistRequest{fp, series.head()} time.Sleep(time.Second) // Give time for persisting to happen. - allEvicted, headChunkToPersist = series.evictOlderThan(10000) - // Now we should really see everything gone. - if !allEvicted { - t.Error("not allEvicted") - } - if headChunkToPersist != nil { - t.Error("headChunkToPersist is not nil although already persisted") - } - // Now archive as it would usually be done in the evictTicker loop. + // Archive metrics. ms.fpToSeries.del(fp) if err := ms.persistence.archiveMetric( fp, series.metric, series.firstTime(), series.lastTime(), @@ -491,8 +478,7 @@ func BenchmarkFuzz(b *testing.B) { directory := test.NewTemporaryDirectory("test_storage", b) defer directory.Close() o := &MemorySeriesStorageOptions{ - MemoryEvictionInterval: time.Second, - MemoryRetentionPeriod: 10 * time.Minute, + MemoryChunks: 100, PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: 3 * time.Second, diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index da0c2f00f..7e4538387 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -36,8 +36,7 @@ func (t *testStorageCloser) Close() { func NewTestStorage(t testing.TB) (Storage, test.Closer) { directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ - MemoryEvictionInterval: time.Minute, - MemoryRetentionPeriod: time.Hour, + MemoryChunks: 1000000, PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Hour,