From d2ab49c396223629f710921301e020f70477dfda Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 6 Feb 2015 14:54:53 +0100 Subject: [PATCH 1/3] Make the persist queue length configurable. Also, set a much higher default value. Chunk persist requests can be quite spiky. If you collect a large number of time series that are very similar, they will tend to finish up a chunk at about the same time. There is no reason we need to back up scraping just because of that. The rationale of the new default value is "1/8 of the chunks in memory". --- main.go | 10 ++++++---- storage/local/storage.go | 31 +++++++++++++------------------ 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/main.go b/main.go index ea23a0026..ffd74095b 100644 --- a/main.go +++ b/main.go @@ -47,7 +47,7 @@ var ( alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.") notificationQueueCapacity = flag.Int("alertmanager.notification-queue-capacity", 100, "The capacity of the queue for pending alert manager notifications.") - metricsStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") + persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") @@ -56,7 +56,8 @@ var ( numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") - storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") + persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") + persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 128*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.") checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.") @@ -116,8 +117,9 @@ func NewPrometheus() *prometheus { o := &local.MemorySeriesStorageOptions{ MemoryChunks: *numMemoryChunks, - PersistenceStoragePath: *metricsStoragePath, - PersistenceRetentionPeriod: *storageRetentionPeriod, + PersistenceStoragePath: *persistenceStoragePath, + PersistenceRetentionPeriod: *persistenceRetentionPeriod, + PersistenceQueueCapacity: *persistenceQueueCapacity, CheckpointInterval: *checkpointInterval, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, Dirty: *storageDirty, diff --git a/storage/local/storage.go b/storage/local/storage.go index 534b9cdc9..843929d2c 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -29,7 +29,6 @@ import ( ) const ( - persistQueueCap = 1024 evictRequestsCap = 1024 chunkLen = 1024 @@ -82,6 +81,7 @@ type memorySeriesStorage struct { persistLatency prometheus.Summary persistErrors prometheus.Counter + persistQueueCapacity prometheus.Metric persistQueueLength prometheus.Gauge numSeries prometheus.Gauge seriesOps *prometheus.CounterVec @@ -97,6 +97,7 @@ type MemorySeriesStorageOptions struct { MemoryChunks int // How many chunks to keep in memory. PersistenceStoragePath string // Location of persistence files. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. + PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. Dirty bool // Force the storage to consider itself dirty on startup. @@ -134,7 +135,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, - persistQueue: make(chan persistRequest, persistQueueCap), + persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity), persistStopped: make(chan struct{}), persistence: p, @@ -157,6 +158,14 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { Name: "persist_errors_total", Help: "The total number of errors while persisting chunks.", }), + persistQueueCapacity: prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"), + "The total capacity of the persist queue.", + nil, nil, + ), + prometheus.GaugeValue, float64(o.PersistenceQueueCapacity), + ), persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -837,32 +846,19 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT return s.persistence.loadChunkDescs(fp, beforeTime) } -// To expose persistQueueCap as metric: -var ( - persistQueueCapDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"), - "The total capacity of the persist queue.", - nil, nil, - ) - persistQueueCapGauge = prometheus.MustNewConstMetric( - persistQueueCapDesc, prometheus.GaugeValue, persistQueueCap, - ) -) - // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) ch <- s.persistLatency.Desc() ch <- s.persistErrors.Desc() + ch <- s.persistQueueCapacity.Desc() ch <- s.persistQueueLength.Desc() ch <- s.numSeries.Desc() s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc() - ch <- persistQueueCapDesc - ch <- numMemChunksDesc } @@ -872,14 +868,13 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.persistLatency ch <- s.persistErrors + ch <- s.persistQueueCapacity ch <- s.persistQueueLength ch <- s.numSeries s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount ch <- s.invalidPreloadRequestsCount - ch <- persistQueueCapGauge - count := atomic.LoadInt64(&numMemChunks) ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count)) } From 5678a869245e54b44201e99cebb54042e7c74eac Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 6 Feb 2015 16:44:56 +0100 Subject: [PATCH 2/3] Throttle scraping if a scrape took longer than the configured interval. The simple algorithm applied here will increase the actual interval incrementally, whenever and as long as the scrape itself takes longer than the configured interval. Once it takes shorter again, the actual interval will iteratively decrease again. --- retrieval/target.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/retrieval/target.go b/retrieval/target.go index 218338683..3e2a22f67 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -265,10 +265,18 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration case <-t.scraperStopping: return case <-ticker.C: - targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) + took := time.Since(t.lastScrape) t.Lock() // Write t.lastScrape requires locking. t.lastScrape = time.Now() t.Unlock() + targetIntervalLength.WithLabelValues(interval.String()).Observe( + float64(took) / float64(time.Second), // Sub-second precision. + ) + // Throttle the scrape if it took longer than interval - by + // sleeping for the time it took longer. This will make the + // actual scrape interval increase as long as a scrape takes + // longer than the interval we are aiming for. + time.Sleep(took - interval) t.scrape(ingester) } } From 16a1a6d32426265d37dbf0428c9fccb6d690fb09 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 6 Feb 2015 18:30:33 +0100 Subject: [PATCH 3/3] Add another check for stopped scraper. --- retrieval/target.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/retrieval/target.go b/retrieval/target.go index 3e2a22f67..7c4ba49f0 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -277,6 +277,13 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration // actual scrape interval increase as long as a scrape takes // longer than the interval we are aiming for. time.Sleep(took - interval) + // After the sleep, we should check again if we have been stopped. + select { + case <-t.scraperStopping: + return + default: + // Do nothing. + } t.scrape(ingester) } }