diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 809775ed7..ad1db59d9 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -54,6 +54,10 @@ var cfg = struct { alertmanagerURLs stringset prometheusURL string + + // Deprecated storage flags, kept for backwards compatibility. + deprecatedMemoryChunks uint64 + deprecatedMaxChunksToPersist uint64 }{ alertmanagerURLs: stringset{}, } @@ -145,17 +149,21 @@ func init() { &cfg.storage.PersistenceStoragePath, "storage.local.path", "data", "Base path for metrics storage.", ) - cfg.fs.IntVar( - &cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024, - "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.", + cfg.fs.Uint64Var( + &cfg.storage.TargetHeapSize, "storage.local.target-heap-size", 2*1024*1024*1024, + "The metrics storage attempts to limit its own memory usage such that the total heap size approaches this value. Note that this is not a hard limit. Actual heap size might be temporarily or permanently higher for a variety of reasons. The default value is a relatively safe setting to not use more than 3 GiB physical memory.", + ) + cfg.fs.Uint64Var( + &cfg.deprecatedMemoryChunks, "storage.local.memory-chunks", 0, + "Deprecated. If set, -storage.local.target-heap-size will be set to this value times 3072.", ) cfg.fs.DurationVar( &cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.", ) - cfg.fs.IntVar( - &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024, - "How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.", + cfg.fs.Uint64Var( + &cfg.deprecatedMaxChunksToPersist, "storage.local.max-chunks-to-persist", 0, + "Deprecated. This flag has no effect anymore.", ) cfg.fs.DurationVar( &cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute, @@ -276,6 +284,10 @@ func parse(args []string) error { // don't expose it as a separate flag but set it here. cfg.storage.HeadChunkTimeout = promql.StalenessDelta + if cfg.storage.TargetHeapSize < 1024*1024 { + return fmt.Errorf("target heap size smaller than %d: %d", 1024*1024, cfg.storage.TargetHeapSize) + } + if err := parsePrometheusURL(); err != nil { return err } @@ -292,6 +304,15 @@ func parse(args []string) error { } } + // Deal with deprecated storage flags. + if cfg.deprecatedMaxChunksToPersist > 0 { + log.Warn("Flag -storage.local.max-chunks-to-persist is deprecated. It has no effect.") + } + if cfg.deprecatedMemoryChunks > 0 { + cfg.storage.TargetHeapSize = cfg.deprecatedMemoryChunks * 3072 + log.Warnf("Flag -storage.local.memory-chunks is deprecated. Its value %d is used to override -storage.local.target-heap-size to %d.", cfg.deprecatedMemoryChunks, cfg.storage.TargetHeapSize) + } + return nil } diff --git a/storage/local/chunk/instrumentation.go b/storage/local/chunk/instrumentation.go index 2a67fb5bd..4dd3231e4 100644 --- a/storage/local/chunk/instrumentation.go +++ b/storage/local/chunk/instrumentation.go @@ -83,18 +83,8 @@ func init() { prometheus.MustRegister(NumMemDescs) } -var ( - // NumMemChunks is the total number of chunks in memory. This is a - // global counter, also used internally, so not implemented as - // metrics. Collected in MemorySeriesStorage.Collect. - // TODO(beorn7): As it is used internally, it is actually very bad style - // to have it as a global variable. - NumMemChunks int64 - - // NumMemChunksDesc is the metric descriptor for the above. - NumMemChunksDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "memory_chunks"), - "The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).", - nil, nil, - ) -) +// NumMemChunks is the total number of chunks in memory. This is a global +// counter, also used internally, so not implemented as metrics. Collected in +// MemorySeriesStorage. +// TODO(beorn7): Having this as an exported global variable is really bad. +var NumMemChunks int64 diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 31c82eece..33a8195a3 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -176,7 +176,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) { // Try to drop one chunk, which must be prevented by the shrink // ratio. Since we do not pass in any chunks to persist, the offset // should be the number of chunks in the file. - for fp, _ := range fpToChunks { + for fp := range fpToChunks { firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, nil) if err != nil { t.Fatal(err) diff --git a/storage/local/storage.go b/storage/local/storage.go index f457fdf8a..b711f2839 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -18,7 +18,7 @@ import ( "container/list" "errors" "fmt" - "math" + "runtime" "sort" "sync" "sync/atomic" @@ -41,8 +41,11 @@ const ( fpMaxSweepTime = 6 * time.Hour fpMaxWaitDuration = 10 * time.Second - // See waitForNextFP. - maxEvictInterval = time.Minute + // See handleEvictList. This should be clearly shorter than the usual CG + // interval. On the other hand, each evict check calls ReadMemStats, + // which involves stopping the world (at least up to Go1.8). Hence, + // don't just set this to a very short interval. + evictInterval = time.Second // Constants to control the hysteresis of entering and leaving "rushed // mode". In rushed mode, the dirty series count is ignored for @@ -76,24 +79,6 @@ const ( fpOtherMatchThreshold = 10000 ) -var ( - numChunksToPersistDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "chunks_to_persist"), - "The current number of chunks waiting for persistence.", - nil, nil, - ) - maxChunksToPersistDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "max_chunks_to_persist"), - "The maximum number of chunks that can be waiting for persistence before sample ingestion will stop.", - nil, nil, - ) - maxMemChunksDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "max_memory_chunks"), - "The configured maximum number of chunks that can be held in memory", - nil, nil, - ) -) - type quarantineRequest struct { fp model.Fingerprint metric model.Metric @@ -147,12 +132,13 @@ type syncStrategy func() bool // interfacing with a persistence layer to make time series data persistent // across restarts and evictable from memory. type MemorySeriesStorage struct { - // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. + // archiveHighWatermark, chunksToPersist, persistUrgency have to be aligned for atomic operations. archiveHighWatermark model.Time // No archived series has samples after this time. numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + persistUrgency int32 // Persistence urgency score * 1000, int32 allows atomic operations. rushed bool // Whether the storage is in rushed mode. - rushedMtx sync.Mutex // Protects entering and exiting rushed mode. + rushedMtx sync.Mutex // Protects rushed. + lastNumGC uint32 // To detect if a GC cycle has run. throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker @@ -162,7 +148,7 @@ type MemorySeriesStorage struct { loopStopping, loopStopped chan struct{} logThrottlingStopped chan struct{} - maxMemoryChunks int + targetHeapSize uint64 dropAfter time.Duration headChunkTimeout time.Duration checkpointInterval time.Duration @@ -178,26 +164,28 @@ type MemorySeriesStorage struct { quarantineRequests chan quarantineRequest quarantineStopping, quarantineStopped chan struct{} - persistErrors prometheus.Counter - queuedChunksToPersist prometheus.Counter - numSeries prometheus.Gauge - numHeadChunks prometheus.Gauge - dirtySeries prometheus.Gauge - seriesOps *prometheus.CounterVec - ingestedSamplesCount prometheus.Counter - discardedSamplesCount *prometheus.CounterVec - nonExistentSeriesMatchesCount prometheus.Counter - maintainSeriesDuration *prometheus.SummaryVec - persistenceUrgencyScore prometheus.Gauge - rushedMode prometheus.Gauge + persistErrors prometheus.Counter + queuedChunksToPersist prometheus.Counter + chunksToPersist prometheus.GaugeFunc + memorySeries prometheus.Gauge + headChunks prometheus.Gauge + dirtySeries prometheus.Gauge + seriesOps *prometheus.CounterVec + ingestedSamples prometheus.Counter + discardedSamples *prometheus.CounterVec + nonExistentSeriesMatches prometheus.Counter + memChunks prometheus.GaugeFunc + maintainSeriesDuration *prometheus.SummaryVec + persistenceUrgencyScore prometheus.GaugeFunc + rushedMode prometheus.GaugeFunc + targetHeapSizeBytes prometheus.GaugeFunc } // MemorySeriesStorageOptions contains options needed by // NewMemorySeriesStorage. It is not safe to leave any of those at their zero // values. type MemorySeriesStorageOptions struct { - MemoryChunks int // How many chunks to keep in memory. - MaxChunksToPersist int // Max number of chunks waiting to be persisted. + TargetHeapSize uint64 // Desired maximum heap size. PersistenceStoragePath string // Location of persistence files. PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped. HeadChunkTimeout time.Duration // Head chunks idle for at least that long may be closed. @@ -222,15 +210,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage loopStopped: make(chan struct{}), logThrottlingStopped: make(chan struct{}), throttled: make(chan struct{}, 1), - maxMemoryChunks: o.MemoryChunks, + targetHeapSize: o.TargetHeapSize, dropAfter: o.PersistenceRetentionPeriod, headChunkTimeout: o.HeadChunkTimeout, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout), - maxChunksToPersist: o.MaxChunksToPersist, - evictList: list.New(), evictRequests: make(chan chunk.EvictRequest, evictRequestsCap), evictStopping: make(chan struct{}), @@ -252,13 +238,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage Name: "queued_chunks_to_persist_total", Help: "The total number of chunks queued for persistence.", }), - numSeries: prometheus.NewGauge(prometheus.GaugeOpts{ + memorySeries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "memory_series", Help: "The current number of series in memory.", }), - numHeadChunks: prometheus.NewGauge(prometheus.GaugeOpts{ + headChunks: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "open_head_chunks", @@ -279,13 +265,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage }, []string{opTypeLabel}, ), - ingestedSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{ + ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "ingested_samples_total", Help: "The total number of samples ingested.", }), - discardedSamplesCount: prometheus.NewCounterVec( + discardedSamples: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -294,12 +280,21 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage }, []string{discardReasonLabel}, ), - nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{ + nonExistentSeriesMatches: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "non_existent_series_matches_total", Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.", }), + memChunks: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_chunks", + Help: "The current number of chunks in memory. The number does not include cloned chunks (i.e. chunks without a descriptor).", + }, + func() float64 { return float64(atomic.LoadInt64(&chunk.NumMemChunks)) }, + ), maintainSeriesDuration: prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, @@ -309,24 +304,63 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage }, []string{seriesLocationLabel}, ), - persistenceUrgencyScore: prometheus.NewGauge(prometheus.GaugeOpts{ + } + + s.chunksToPersist = prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_to_persist", + Help: "The current number of chunks waiting for persistence.", + }, + func() float64 { + return float64(s.getNumChunksToPersist()) + }, + ) + s.rushedMode = prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rushed_mode", + Help: "1 if the storage is in rushed mode, 0 otherwise.", + }, + func() float64 { + s.rushedMtx.Lock() + defer s.rushedMtx.Unlock() + if s.rushed { + return 1 + } + return 0 + }, + ) + s.persistenceUrgencyScore = prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "persistence_urgency_score", Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.", - }), - rushedMode: prometheus.NewGauge(prometheus.GaugeOpts{ + }, + func() float64 { + score, _ := s.getPersistenceUrgencyScore() + return score + }, + ) + s.targetHeapSizeBytes = prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "rushed_mode", - Help: "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1.", - }), - } + Name: "target_heap_size_bytes", + Help: "The configured target heap size in bytes.", + }, + func() float64 { + return float64(s.targetHeapSize) + }, + ) // Initialize metric vectors. // TODO(beorn7): Rework once we have a utility function for it in client_golang. - s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp) - s.discardedSamplesCount.WithLabelValues(duplicateSample) + s.discardedSamples.WithLabelValues(outOfOrderTimestamp) + s.discardedSamples.WithLabelValues(duplicateSample) s.maintainSeriesDuration.WithLabelValues(maintainInMemory) s.maintainSeriesDuration.WithLabelValues(maintainArchived) s.seriesOps.WithLabelValues(create) @@ -353,7 +387,10 @@ func (s *MemorySeriesStorage) Start() (err error) { case Always: syncStrategy = func() bool { return true } case Adaptive: - syncStrategy = func() bool { return s.calculatePersistenceUrgencyScore() < 1 } + syncStrategy = func() bool { + _, rushed := s.getPersistenceUrgencyScore() + return !rushed + } default: panic("unknown sync strategy") } @@ -385,7 +422,7 @@ func (s *MemorySeriesStorage) Start() (err error) { for fp := range s.fpToSeries.fpIter() { if series, ok := s.fpToSeries.get(fp); ok { if !series.headChunkClosed { - s.numHeadChunks.Inc() + s.headChunks.Inc() } } } @@ -394,7 +431,7 @@ func (s *MemorySeriesStorage) Start() (err error) { return err } log.Infof("%d series loaded.", s.fpToSeries.length()) - s.numSeries.Set(float64(s.fpToSeries.length())) + s.memorySeries.Set(float64(s.fpToSeries.length())) s.mapper, err = newFPMapper(s.fpToSeries, p) if err != nil { @@ -805,7 +842,7 @@ func (s *MemorySeriesStorage) metricForRange( // we have a chance the archived metric is not in the range. has, first, last := s.persistence.hasArchivedMetric(fp) if !has { - s.nonExistentSeriesMatchesCount.Inc() + s.nonExistentSeriesMatches.Inc() return nil, nil, false } if first.After(through) || last.Before(from) { @@ -882,11 +919,11 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error { sample.Value.Equal(series.lastSampleValue) { return nil } - s.discardedSamplesCount.WithLabelValues(duplicateSample).Inc() + s.discardedSamples.WithLabelValues(duplicateSample).Inc() return ErrDuplicateSampleForTimestamp // Caused by the caller. } if sample.Timestamp < series.lastTime { - s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc() + s.discardedSamples.WithLabelValues(outOfOrderTimestamp).Inc() return ErrOutOfOrderSample // Caused by the caller. } completedChunksCount, err := series.add(model.SamplePair{ @@ -897,7 +934,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error { s.quarantineSeries(fp, sample.Metric, err) return err } - s.ingestedSamplesCount.Inc() + s.ingestedSamples.Inc() s.incNumChunksToPersist(completedChunksCount) return nil @@ -905,8 +942,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error { // NeedsThrottling implements Storage. func (s *MemorySeriesStorage) NeedsThrottling() bool { - if s.getNumChunksToPersist() > s.maxChunksToPersist || - float64(atomic.LoadInt64(&chunk.NumMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { + if score, _ := s.getPersistenceUrgencyScore(); score >= 1 { select { case s.throttled <- struct{}{}: default: // Do nothing, signal already pending. @@ -937,19 +973,19 @@ func (s *MemorySeriesStorage) logThrottling() { select { case <-s.throttled: if !timer.Reset(time.Minute) { + score, _ := s.getPersistenceUrgencyScore() log. + With("urgencyScore", score). With("chunksToPersist", s.getNumChunksToPersist()). - With("maxChunksToPersist", s.maxChunksToPersist). With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). - With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") } case <-timer.C: + score, _ := s.getPersistenceUrgencyScore() log. + With("urgencyScore", score). With("chunksToPersist", s.getNumChunksToPersist()). - With("maxChunksToPersist", s.maxChunksToPersist). With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). - With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). Info("Storage does not need throttling anymore.") case <-s.loopStopping: return @@ -994,9 +1030,9 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return nil, err } s.fpToSeries.put(fp, series) - s.numSeries.Inc() + s.memorySeries.Inc() if !series.headChunkClosed { - s.numHeadChunks.Inc() + s.headChunks.Inc() } } return series, nil @@ -1061,23 +1097,16 @@ func (s *MemorySeriesStorage) preloadChunksForInstant( } func (s *MemorySeriesStorage) handleEvictList() { - ticker := time.NewTicker(maxEvictInterval) - count := 0 + // This ticker is supposed to tick at least once per GC cyle. Ideally, + // we would handle the evict list after each finished GC cycle, but I + // don't know of a way to "subscribe" to that kind of event. + ticker := time.NewTicker(evictInterval) for { - // To batch up evictions a bit, this tries evictions at least - // once per evict interval, but earlier if the number of evict - // requests with evict==true that have happened since the last - // evict run is more than maxMemoryChunks/1000. select { case req := <-s.evictRequests: if req.Evict { req.Desc.EvictListElement = s.evictList.PushBack(req.Desc) - count++ - if count > s.maxMemoryChunks/1000 { - s.maybeEvict() - count = 0 - } } else { if req.Desc.EvictListElement != nil { s.evictList.Remove(req.Desc.EvictListElement) @@ -1085,9 +1114,7 @@ func (s *MemorySeriesStorage) handleEvictList() { } } case <-ticker.C: - if s.evictList.Len() > 0 { - s.maybeEvict() - } + s.maybeEvict() case <-s.evictStopping: // Drain evictRequests forever in a goroutine to not let // requesters hang. @@ -1106,10 +1133,14 @@ func (s *MemorySeriesStorage) handleEvictList() { // maybeEvict is a local helper method. Must only be called by handleEvictList. func (s *MemorySeriesStorage) maybeEvict() { - numChunksToEvict := int(atomic.LoadInt64(&chunk.NumMemChunks)) - s.maxMemoryChunks + ms := runtime.MemStats{} + runtime.ReadMemStats(&ms) + numChunksToEvict := s.calculatePersistUrgency(&ms) + if numChunksToEvict <= 0 { return } + chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict) for i := range chunkDescsToEvict { e := s.evictList.Front() @@ -1143,6 +1174,118 @@ func (s *MemorySeriesStorage) maybeEvict() { }() } +// calculatePersistUrgency calculates and sets s.persistUrgency. Based on the +// calculation, it returns the number of chunks to evict. The runtime.MemStats +// are passed in here for testability. +// +// The persist urgency is calculated by the following formula: +// +// n(toPersist) MAX( h(nextGC), h(current) ) +// p = MIN( 1, --------------------------- * ---------------------------- ) +// n(toPersist) + n(evictable) h(target) +// +// where: +// +// n(toPersist): Number of chunks waiting for persistence. +// n(evictable): Number of evictable chunks. +// h(nextGC): Heap size at which the next GC will kick in (ms.NextGC). +// h(current): Current heap size (ms.HeapAlloc). +// h(target): Configured target heap size. +// +// Note that the actual value stored in s.persistUrgency is 1000 times the value +// calculated as above to allow using an int32, which supports atomic +// operations. +// +// If no GC has run after the last call of this method, it will always return 0 +// (no reason to try to evict any more chunks before we have seen the effect of +// the previous eviction). It will also not decrease the persist urgency in this +// case (but it will increase the persist urgency if a higher value was calculated). +// +// If a GC has run after the last call of this method, the following cases apply: +// +// - If MAX( h(nextGC), h(current) ) < h(target), simply return 0. Nothing to +// evict if the heap is still small enough. +// +// - Otherwise, if n(evictable) is 0, also return 0, but set the urgency score +// to 1 to signal that we want to evict chunk but have no evictable chunks +// available. +// +// - Otherwise, calulate the number of chunks to evict and return it: +// +// MAX( h(nextGC), h(current) ) - h(target) +// n(toEvict) = MIN( n(evictable), ---------------------------------------- ) +// c +// +// where c is the size of a chunk. +// +// - In the latter case, the persist urgency might be increased. The final value +// is the following: +// +// n(toEvict) +// MAX( p, ------------ ) +// n(evictable) +// +// Broadly speaking, the persist urgency is based on the ratio of the number of +// chunks we want to evict and the number of chunks that are actually +// evictable. However, in particular for the case where we don't need to evict +// chunks yet, it also takes into account how close the heap has already grown +// to the configured target size, and how big the pool of chunks to persist is +// compared to the number of chunks already evictable. +// +// This is a helper method only to be called by MemorySeriesStorage.maybeEvict. +func (s *MemorySeriesStorage) calculatePersistUrgency(ms *runtime.MemStats) int { + var ( + oldUrgency = atomic.LoadInt32(&s.persistUrgency) + newUrgency int32 + numChunksToPersist = s.getNumChunksToPersist() + ) + defer func() { + if newUrgency > 1000 { + newUrgency = 1000 + } + atomic.StoreInt32(&s.persistUrgency, newUrgency) + }() + + // Take the NextGC as the relevant heap size because the heap will grow + // to that size before GC kicks in. However, at times the current heap + // is already larger than NextGC, in which case we take that worse case. + heapSize := ms.NextGC + if ms.HeapAlloc > ms.NextGC { + heapSize = ms.HeapAlloc + } + + if numChunksToPersist > 0 { + newUrgency = int32(1000 * uint64(numChunksToPersist) / uint64(numChunksToPersist+s.evictList.Len()) * heapSize / s.targetHeapSize) + } + + // Only continue if a GC has happened since we were here last time. + if ms.NumGC == s.lastNumGC { + if oldUrgency > newUrgency { + // Never reduce urgency without a GC run. + newUrgency = oldUrgency + } + return 0 + } + s.lastNumGC = ms.NumGC + + if heapSize <= s.targetHeapSize { + return 0 // Heap still small enough, don't evict. + } + if s.evictList.Len() == 0 { + // We want to reduce heap size but there is nothing to evict. + newUrgency = 1000 + return 0 + } + numChunksToEvict := int((heapSize - s.targetHeapSize) / chunk.ChunkLen) + if numChunksToEvict > s.evictList.Len() { + numChunksToEvict = s.evictList.Len() + } + if u := int32(numChunksToEvict * 1000 / s.evictList.Len()); u > newUrgency { + newUrgency = u + } + return numChunksToEvict +} + // waitForNextFP waits an estimated duration, after which we want to process // another fingerprint so that we will process all fingerprints in a tenth of // s.dropAfter assuming that the system is doing nothing else, e.g. if we want @@ -1213,7 +1356,11 @@ func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger return } // Reduce the wait time according to the urgency score. - s.waitForNextFP(s.fpToSeries.length(), 1-s.calculatePersistenceUrgencyScore()) + score, rushed := s.getPersistenceUrgencyScore() + if rushed { + score = 1 + } + s.waitForNextFP(s.fpToSeries.length(), 1-score) count++ } if count > 0 { @@ -1329,8 +1476,8 @@ loop: // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as // quickly as possible. So only checkpoint if the urgency score is < 1. - if dirty >= int64(s.checkpointDirtySeriesLimit) && - s.calculatePersistenceUrgencyScore() < 1 { + if _, rushed := s.getPersistenceUrgencyScore(); !rushed && + dirty >= int64(s.checkpointDirtySeriesLimit) { checkpointTimer.Reset(0) } } @@ -1404,7 +1551,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( } if closed { s.incNumChunksToPersist(1) - s.numHeadChunks.Dec() + s.headChunks.Dec() } seriesWasDirty := series.dirty @@ -1426,7 +1573,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( // an age of at least headChunkTimeout (which is very likely anyway). if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout { s.fpToSeries.del(fp) - s.numSeries.Dec() + s.memorySeries.Dec() s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) @@ -1523,7 +1670,7 @@ func (s *MemorySeriesStorage) writeMemorySeries( if len(series.chunkDescs) == 0 && allDroppedFromPersistence { // All chunks dropped from both memory and persistence. Delete the series for good. s.fpToSeries.del(fp) - s.numSeries.Dec() + s.memorySeries.Dec() s.seriesOps.WithLabelValues(memoryPurge).Inc() s.persistence.unindexMetric(fp, series.metric) return true @@ -1586,12 +1733,12 @@ func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd return s.persistence.loadChunkDescs(fp, offsetFromEnd) } -// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way. +// getNumChunksToPersist returns chunksToPersist in a goroutine-safe way. func (s *MemorySeriesStorage) getNumChunksToPersist() int { return int(atomic.LoadInt64(&s.numChunksToPersist)) } -// incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a +// incNumChunksToPersist increments chunksToPersist in a goroutine-safe way. Use a // negative 'by' to decrement. func (s *MemorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) @@ -1600,89 +1747,57 @@ func (s *MemorySeriesStorage) incNumChunksToPersist(by int) { } } -// calculatePersistenceUrgencyScore calculates and returns an urgency score for -// the speed of persisting chunks. The score is between 0 and 1, where 0 means -// no urgency at all and 1 means highest urgency. +// getPersistenceUrgencyScore returns an urgency score for the speed of +// persisting chunks. The score is between 0 and 1, where 0 means no urgency at +// all and 1 means highest urgency. It also returns if the storage is in +// "rushed mode". // -// The score is the maximum of the two following sub-scores: +// The storage enters "rushed mode" if the score exceeds +// persintenceUrgencyScoreForEnteringRushedMode at the time this method is +// called. It will leave "rushed mode" if, at a later time this method is +// called, the score is below persintenceUrgencyScoreForLeavingRushedMode. +// "Rushed mode" plays a role for the adaptive series-sync-strategy. It also +// switches off early checkpointing (due to dirty series), and it makes series +// maintenance happen as quickly as possible. // -// (1) The first sub-score is the number of chunks waiting for persistence -// divided by the maximum number of chunks allowed to be waiting for -// persistence. +// A score of 1 will trigger throttling of sample ingestion. // -// (2) If there are more chunks in memory than allowed AND there are more chunks -// waiting for persistence than factorMinChunksToPersist times -// -storage.local.max-chunks-to-persist, then the second sub-score is the -// fraction the number of memory chunks has reached between -// -storage.local.memory-chunks and toleranceFactorForMemChunks times -// -storage.local.memory-chunks. -// -// Should the score ever hit persintenceUrgencyScoreForEnteringRushedMode, the -// storage locks into "rushed mode", in which the returned score is always -// bumped up to 1 until the non-bumped score is below -// persintenceUrgencyScoreForLeavingRushedMode. -// -// This method is not goroutine-safe, but it is only ever called by the single -// goroutine that is in charge of series maintenance. According to the returned -// score, series maintenance should be sped up. If a score of 1 is returned, -// checkpointing based on dirty-series count should be disabled, and series -// files should not by synced anymore provided the user has specified the -// adaptive sync strategy. -func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 { +// It is safe to call this method concurrently. +func (s *MemorySeriesStorage) getPersistenceUrgencyScore() (float64, bool) { s.rushedMtx.Lock() defer s.rushedMtx.Unlock() - var ( - chunksToPersist = float64(s.getNumChunksToPersist()) - maxChunksToPersist = float64(s.maxChunksToPersist) - memChunks = float64(atomic.LoadInt64(&chunk.NumMemChunks)) - maxMemChunks = float64(s.maxMemoryChunks) - ) - score := chunksToPersist / maxChunksToPersist - if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist { - score = math.Max( - score, - (memChunks/maxMemChunks-1)/(toleranceFactorMemChunks-1), - ) - } + score := float64(atomic.LoadInt32(&s.persistUrgency)) / 1000 if score > 1 { score = 1 } - s.persistenceUrgencyScore.Set(score) if s.rushed { // We are already in rushed mode. If the score is still above - // persintenceUrgencyScoreForLeavingRushedMode, return 1 and - // leave things as they are. + // persintenceUrgencyScoreForLeavingRushedMode, return the score + // and leave things as they are. if score > persintenceUrgencyScoreForLeavingRushedMode { - return 1 + return score, true } // We are out of rushed mode! s.rushed = false - s.rushedMode.Set(0) log. With("urgencyScore", score). - With("chunksToPersist", int(chunksToPersist)). - With("maxChunksToPersist", int(maxChunksToPersist)). - With("memoryChunks", int(memChunks)). - With("maxMemoryChunks", int(maxMemChunks)). + With("chunksToPersist", s.getNumChunksToPersist()). + With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). Info("Storage has left rushed mode.") - return score + return score, false } if score > persintenceUrgencyScoreForEnteringRushedMode { // Enter rushed mode. s.rushed = true - s.rushedMode.Set(1) log. With("urgencyScore", score). - With("chunksToPersist", int(chunksToPersist)). - With("maxChunksToPersist", int(maxChunksToPersist)). - With("memoryChunks", int(memChunks)). - With("maxMemoryChunks", int(maxMemChunks)). + With("chunksToPersist", s.getNumChunksToPersist()). + With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). Warn("Storage has entered rushed mode.") - return 1 } - return score + return score, s.rushed } // quarantineSeries registers the provided fingerprint for quarantining. It @@ -1742,10 +1857,10 @@ func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, if series, ok = s.fpToSeries.get(fp); ok { s.fpToSeries.del(fp) - s.numSeries.Dec() + s.memorySeries.Dec() m = series.metric - // Adjust s.numChunksToPersist and chunk.NumMemChunks down by + // Adjust s.chunksToPersist and chunk.NumMemChunks down by // the number of chunks in this series that are not // persisted yet. Persisted chunks will be deducted from // chunk.NumMemChunks upon eviction. @@ -1802,19 +1917,19 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.persistErrors.Desc() ch <- s.queuedChunksToPersist.Desc() - ch <- maxChunksToPersistDesc - ch <- numChunksToPersistDesc - ch <- s.numSeries.Desc() - ch <- s.numHeadChunks.Desc() + ch <- s.chunksToPersist.Desc() + ch <- s.memorySeries.Desc() + ch <- s.headChunks.Desc() ch <- s.dirtySeries.Desc() s.seriesOps.Describe(ch) - ch <- s.ingestedSamplesCount.Desc() - s.discardedSamplesCount.Describe(ch) - ch <- s.nonExistentSeriesMatchesCount.Desc() - ch <- chunk.NumMemChunksDesc + ch <- s.ingestedSamples.Desc() + s.discardedSamples.Describe(ch) + ch <- s.nonExistentSeriesMatches.Desc() + ch <- s.memChunks.Desc() s.maintainSeriesDuration.Describe(ch) ch <- s.persistenceUrgencyScore.Desc() ch <- s.rushedMode.Desc() + ch <- s.targetHeapSizeBytes.Desc() } // Collect implements prometheus.Collector. @@ -1824,34 +1939,17 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.persistErrors ch <- s.queuedChunksToPersist - ch <- prometheus.MustNewConstMetric( - maxChunksToPersistDesc, - prometheus.GaugeValue, - float64(s.maxChunksToPersist), - ) - ch <- prometheus.MustNewConstMetric( - numChunksToPersistDesc, - prometheus.GaugeValue, - float64(s.getNumChunksToPersist()), - ) - ch <- s.numSeries - ch <- s.numHeadChunks + ch <- s.chunksToPersist + ch <- s.memorySeries + ch <- s.headChunks ch <- s.dirtySeries s.seriesOps.Collect(ch) - ch <- s.ingestedSamplesCount - s.discardedSamplesCount.Collect(ch) - ch <- s.nonExistentSeriesMatchesCount - ch <- prometheus.MustNewConstMetric( - maxMemChunksDesc, - prometheus.GaugeValue, - float64(s.maxMemoryChunks), - ) - ch <- prometheus.MustNewConstMetric( - chunk.NumMemChunksDesc, - prometheus.GaugeValue, - float64(atomic.LoadInt64(&chunk.NumMemChunks)), - ) + ch <- s.ingestedSamples + s.discardedSamples.Collect(ch) + ch <- s.nonExistentSeriesMatches + ch <- s.memChunks s.maintainSeriesDuration.Collect(ch) ch <- s.persistenceUrgencyScore ch <- s.rushedMode + ch <- s.targetHeapSizeBytes } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index f456991de..2ff4c2686 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "os" + "runtime" "strconv" "sync/atomic" "testing" @@ -824,8 +825,7 @@ func TestLoop(t *testing.T) { directory := testutil.NewTemporaryDirectory("test_storage", t) defer directory.Close() o := &MemorySeriesStorageOptions{ - MemoryChunks: 50, - MaxChunksToPersist: 1000000, + TargetHeapSize: 100000, PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceStoragePath: directory.Path(), HeadChunkTimeout: 5 * time.Minute, @@ -877,7 +877,6 @@ func testChunk(t *testing.T, encoding chunk.Encoding) { for m := range s.fpToSeries.iter() { s.fpLocker.Lock(m.fp) - defer s.fpLocker.Unlock(m.fp) // TODO remove, see below var values []model.SamplePair for _, cd := range m.series.chunkDescs { if cd.IsEvicted() { @@ -900,7 +899,7 @@ func testChunk(t *testing.T, encoding chunk.Encoding) { t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value) } } - //s.fpLocker.Unlock(m.fp) + s.fpLocker.Unlock(m.fp) } log.Info("test done, closing") } @@ -1459,8 +1458,8 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) { s, closer := NewTestStorage(t, encoding) defer closer.Close() - // Adjust memory chunks to lower value to see evictions. - s.maxMemoryChunks = 1 + // Adjust target heap size to lower value to see evictions. + s.targetHeapSize = 1000000 for _, sample := range samples { s.Append(sample) @@ -1478,7 +1477,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) { // Maintain series without any dropped chunks. s.maintainMemorySeries(fp, 0) // Give the evict goroutine an opportunity to run. - time.Sleep(250 * time.Millisecond) + time.Sleep(1250 * time.Millisecond) // Maintain series again to trigger chunk.Desc eviction. s.maintainMemorySeries(fp, 0) @@ -1605,8 +1604,7 @@ func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) { directory := testutil.NewTemporaryDirectory("test_storage", b) defer directory.Close() o := &MemorySeriesStorageOptions{ - MemoryChunks: 100, - MaxChunksToPersist: 1000000, + TargetHeapSize: 200000, PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), HeadChunkTimeout: 5 * time.Minute, @@ -2010,3 +2008,239 @@ func TestAppendOutOfOrder(t *testing.T) { } } } + +func TestCalculatePersistUrgency(t *testing.T) { + tests := map[string]struct { + persistUrgency int32 + lenEvictList int + numChunksToPersist int64 + targetHeapSize, msNextGC, msHeapAlloc uint64 + msNumGC, lastNumGC uint32 + + wantPersistUrgency int32 + wantChunksToEvict int + wantLastNumGC uint32 + }{ + "all zeros": { + persistUrgency: 0, + lenEvictList: 0, + numChunksToPersist: 0, + targetHeapSize: 0, + msNextGC: 0, + msHeapAlloc: 0, + msNumGC: 0, + lastNumGC: 0, + + wantPersistUrgency: 0, + wantChunksToEvict: 0, + wantLastNumGC: 0, + }, + "far from target heap size, plenty of chunks to persist, GC has happened": { + persistUrgency: 500, + lenEvictList: 1000, + numChunksToPersist: 100, + targetHeapSize: 1000000, + msNextGC: 500000, + msHeapAlloc: 400000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 45, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "far from target heap size, plenty of chunks to persist, GC hasn't happened, urgency must not decrease": { + persistUrgency: 500, + lenEvictList: 1000, + numChunksToPersist: 100, + targetHeapSize: 1000000, + msNextGC: 500000, + msHeapAlloc: 400000, + msNumGC: 42, + lastNumGC: 42, + + wantPersistUrgency: 500, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "far from target heap size but no chunks to persist": { + persistUrgency: 50, + lenEvictList: 0, + numChunksToPersist: 100, + targetHeapSize: 1000000, + msNextGC: 500000, + msHeapAlloc: 400000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 500, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "far from target heap size but no chunks to persist, HeapAlloc > NextGC": { + persistUrgency: 50, + lenEvictList: 0, + numChunksToPersist: 100, + targetHeapSize: 1000000, + msNextGC: 500000, + msHeapAlloc: 600000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 600, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "target heap size exceeded but GC hasn't happened": { + persistUrgency: 50, + lenEvictList: 3000, + numChunksToPersist: 1000, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 42, + + wantPersistUrgency: 275, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "target heap size exceeded, GC has happened": { + persistUrgency: 50, + lenEvictList: 3000, + numChunksToPersist: 1000, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 275, + wantChunksToEvict: 97, + wantLastNumGC: 42, + }, + "target heap size exceeded, GC has happened, urgency bumped due to low number of evictable chunks": { + persistUrgency: 50, + lenEvictList: 300, + numChunksToPersist: 100, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 323, + wantChunksToEvict: 97, + wantLastNumGC: 42, + }, + "target heap size exceeded but no evictable chunks and GC hasn't happened": { + persistUrgency: 50, + lenEvictList: 0, + numChunksToPersist: 1000, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 42, + + wantPersistUrgency: 1000, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "target heap size exceeded but no evictable chunks and GC has happened": { + persistUrgency: 50, + lenEvictList: 0, + numChunksToPersist: 1000, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 1000, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "target heap size exceeded, very few evictable chunks, GC hasn't happened": { + persistUrgency: 50, + lenEvictList: 10, + numChunksToPersist: 1000, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 42, + + wantPersistUrgency: 1000, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "target heap size exceeded, some evictable chunks (but not enough), GC hasn't happened": { + persistUrgency: 50, + lenEvictList: 50, + numChunksToPersist: 250, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 42, + + wantPersistUrgency: 916, + wantChunksToEvict: 0, + wantLastNumGC: 42, + }, + "target heap size exceeded, some evictable chunks (but not enough), GC has happened": { + persistUrgency: 50, + lenEvictList: 50, + numChunksToPersist: 250, + targetHeapSize: 1000000, + msNextGC: 1100000, + msHeapAlloc: 900000, + msNumGC: 42, + lastNumGC: 41, + + wantPersistUrgency: 1000, + wantChunksToEvict: 50, + wantLastNumGC: 42, + }, + } + + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + for scenario, test := range tests { + s.persistUrgency = test.persistUrgency + s.numChunksToPersist = test.numChunksToPersist + s.targetHeapSize = test.targetHeapSize + s.lastNumGC = test.lastNumGC + s.evictList.Init() + for i := 0; i < test.lenEvictList; i++ { + s.evictList.PushBack(&struct{}{}) + } + ms := runtime.MemStats{ + NextGC: test.msNextGC, + HeapAlloc: test.msHeapAlloc, + NumGC: test.msNumGC, + } + chunksToEvict := s.calculatePersistUrgency(&ms) + + if chunksToEvict != test.wantChunksToEvict { + t.Errorf( + "scenario %q: got %d chunks to evict, want %d", + scenario, chunksToEvict, test.wantChunksToEvict, + ) + } + if s.persistUrgency != test.wantPersistUrgency { + t.Errorf( + "scenario %q: got persist urgency %d, want %d", + scenario, s.persistUrgency, test.wantPersistUrgency, + ) + } + if s.lastNumGC != test.wantLastNumGC { + t.Errorf( + "scenario %q: got lastNumGC %d , want %d", + scenario, s.lastNumGC, test.wantLastNumGC, + ) + } + } +} diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index dc5a4502d..7b6cc51e6 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -45,8 +45,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage chunk.DefaultEncoding = encoding directory := testutil.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ - MemoryChunks: 1000000, - MaxChunksToPersist: 1000000, + TargetHeapSize: 1000000000, PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceStoragePath: directory.Path(), HeadChunkTimeout: 5 * time.Minute,