diff --git a/storage/local/chunk.go b/storage/local/chunk.go index f1d2d5d4c..b60e8767c 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -16,57 +16,13 @@ package local import ( "io" "sync" + "sync/atomic" clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/storage/metric" ) -// Instrumentation. -// Note that the metrics are often set in bulk by the caller -// for performance reasons. -var ( - chunkOps = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunk_ops_total", - Help: "The total number of chunk operations by their type.", - }, - []string{opTypeLabel}, - ) - chunkDescOps = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunkdesc_ops_total", - Help: "The total number of chunk descriptor operations by their type.", - }, - []string{opTypeLabel}, - ) -) - -const ( - // Op-types for chunkOps. - createAndPin = "create" // A chunkDesc creation with refCount=1. - persistAndUnpin = "persist" - pin = "pin" // Excluding the pin on creation. - unpin = "unpin" // Excluding the unpin on persisting. - clone = "clone" - transcode = "transcode" - purge = "purge" - - // Op-types for chunkOps and chunkDescOps. - evict = "evict" - load = "load" -) - -func init() { - prometheus.MustRegister(chunkOps) - prometheus.MustRegister(chunkDescOps) -} - type chunkDesc struct { sync.Mutex chunk chunk @@ -76,10 +32,12 @@ type chunkDesc struct { chunkLastTime clientmodel.Timestamp // Used if chunk is evicted. } -// newChunkDesc creates a new chunkDesc pointing to the given chunk (may be -// nil). The refCount of the new chunkDesc is 1. +// newChunkDesc creates a new chunkDesc pointing to the given chunk. The +// refCount of the new chunkDesc is 1. func newChunkDesc(c chunk) *chunkDesc { chunkOps.WithLabelValues(createAndPin).Inc() + atomic.AddInt64(&numMemChunks, 1) + atomic.AddInt64(&numMemChunkDescs, 1) return &chunkDesc{chunk: c, refCount: 1} } @@ -179,6 +137,7 @@ func (cd *chunkDesc) evictNow() { cd.chunkLastTime = cd.chunk.lastTime() cd.chunk = nil chunkOps.WithLabelValues(evict).Inc() + atomic.AddInt64(&numMemChunks, -1) } // chunk is the interface for all chunks. Chunks are generally not diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go new file mode 100644 index 000000000..1c061b25b --- /dev/null +++ b/storage/local/instrumentation.go @@ -0,0 +1,90 @@ +// Copyright 2014 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import "github.com/prometheus/client_golang/prometheus" + +// Usually, a separate file for instrumentation is frowned upon. Metrics should +// be close to where they are used. However,the metrics below are set all over +// the place, so we go for a separate instrumentation file in this case. +var ( + chunkOps = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunk_ops_total", + Help: "The total number of chunk operations by their type.", + }, + []string{opTypeLabel}, + ) + chunkDescOps = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkdesc_ops_total", + Help: "The total number of chunk descriptor operations by their type.", + }, + []string{opTypeLabel}, + ) +) + +const ( + namespace = "prometheus" + subsystem = "local_storage" + + opTypeLabel = "type" + + // Op-types for seriesOps. + create = "create" + archive = "archive" + unarchive = "unarchive" + memoryPurge = "purge_from_memory" + archivePurge = "purge_from_archive" + + // Op-types for chunkOps. + createAndPin = "create" // A chunkDesc creation with refCount=1. + persistAndUnpin = "persist" + pin = "pin" // Excluding the pin on creation. + unpin = "unpin" // Excluding the unpin on persisting. + clone = "clone" + transcode = "transcode" + purge = "purge" + + // Op-types for chunkOps and chunkDescOps. + evict = "evict" + load = "load" +) + +func init() { + prometheus.MustRegister(chunkOps) + prometheus.MustRegister(chunkDescOps) +} + +var ( + // Global counters, also used internally, so not implemented as + // metrics. Collected in memorySeriesStorage.Collect. + numMemChunks, numMemChunkDescs int64 + + // Metric descriptors for the above. + numMemChunksDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "memory_chunks"), + "The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).", + nil, nil, + ) + numMemChunkDescsDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "memory_chunkdescs"), + "The current number of chunk descriptors in memory.", + nil, nil, + ) +) diff --git a/storage/local/interface.go b/storage/local/interface.go index c1bee10e8..88e70ca07 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -21,15 +21,6 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -// String constants for instrumentation. -const ( - namespace = "prometheus" - subsystem = "local_storage" - - errorLabel = "error" - opTypeLabel = "type" -) - // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. type Storage interface { diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 80164bc1c..0e702bfbd 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -21,6 +21,7 @@ import ( "os" "path" "sync" + "sync/atomic" "time" "github.com/golang/glog" @@ -335,6 +336,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie cds = append(cds, cd) } chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) + atomic.AddInt64(&numMemChunkDescs, int64(len(cds))) return cds, nil } @@ -422,6 +424,8 @@ func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) e // nothing else is running in storage land. This method is utterly // goroutine-unsafe. func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { + var chunksTotal, chunkDescsTotal int64 + f, err := os.Open(p.headsPath()) if os.IsNotExist(err) { return newSeriesMap(), nil @@ -471,6 +475,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { return nil, err } chunkDescs := make([]*chunkDesc, numChunkDescs) + chunkDescsTotal += numChunkDescs for i := int64(0); i < numChunkDescs; i++ { if headChunkPersisted || i < numChunkDescs-1 { @@ -488,6 +493,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { } } else { // Non-persisted head chunk. + chunksTotal++ chunkType, err := r.ReadByte() if err != nil { return nil, err @@ -507,6 +513,8 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { headChunkPersisted: headChunkPersisted, } } + atomic.AddInt64(&numMemChunks, chunksTotal) + atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal) return &seriesMap{m: fingerprintToSeries}, nil } diff --git a/storage/local/series.go b/storage/local/series.go index fc17be1f5..6bf323b35 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -17,6 +17,7 @@ import ( "math" "sort" "sync" + "sync/atomic" clientmodel "github.com/prometheus/client_golang/model" @@ -231,10 +232,12 @@ func (s *memorySeries) evictOlderThan( lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { s.chunkDescsLoaded = false - chunkDescOps.WithLabelValues(evict).Add(float64(len(s.chunkDescs) - lenToKeep)) + lenEvicted := len(s.chunkDescs) - lenToKeep + chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) + atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted)) s.chunkDescs = append( make([]*chunkDesc, 0, lenToKeep), - s.chunkDescs[len(s.chunkDescs)-lenToKeep:]..., + s.chunkDescs[lenEvicted:]..., ) } } @@ -280,12 +283,10 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { keepIdx = i break } - } - - for i := 0; i < keepIdx; i++ { s.chunkDescs[i].evictOnUnpin() } s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) + atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx)) return len(s.chunkDescs) == 0 } @@ -318,6 +319,7 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes s.chunkDescs[loadIndexes[i]].setChunk(c) } chunkOps.WithLabelValues(load).Add(float64(len(chunks))) + atomic.AddInt64(&numMemChunks, int64(len(chunks))) } return pinnedChunkDescs, nil diff --git a/storage/local/storage.go b/storage/local/storage.go index a9cc11c68..a06c51f6d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,6 +16,7 @@ package local import ( "fmt" + "sync/atomic" "time" "github.com/golang/glog" @@ -28,93 +29,6 @@ import ( const persistQueueCap = 1024 -// Instrumentation. -var ( - persistLatency = prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "persist_latency_microseconds", - Help: "A summary of latencies for persisting each chunk.", - }) - persistErrors = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "persist_errors_total", - Help: "A counter of errors persisting chunks.", - }, - []string{errorLabel}, - ) - persistQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "persist_queue_length", - Help: "The current number of chunks waiting in the persist queue.", - }) - persistQueueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "persist_queue_capacity", - Help: "The total capacity of the persist queue.", - }) - numSeries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "memory_series", - Help: "The current number of series in memory.", - }) - seriesOps = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "series_ops_total", - Help: "The total number of series operations by their type.", - }, - []string{opTypeLabel}, - ) - ingestedSamplesCount = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "ingested_samples_total", - Help: "The total number of samples ingested.", - }) - purgeDuration = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "purge_duration_milliseconds", - Help: "The duration of the last storage purge iteration in milliseconds.", - }) - evictionDuration = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "eviction_duration_milliseconds", - Help: "The duration of the last memory eviction iteration in milliseconds.", - }) -) - -const ( - // Op-types for seriesOps. - create = "create" - archive = "archive" - unarchive = "unarchive" - memoryPurge = "purge_from_memory" - archivePurge = "purge_from_archive" -) - -func init() { - prometheus.MustRegister(persistLatency) - prometheus.MustRegister(persistErrors) - prometheus.MustRegister(persistQueueLength) - prometheus.MustRegister(persistQueueCapacity) - prometheus.MustRegister(numSeries) - prometheus.MustRegister(seriesOps) - prometheus.MustRegister(ingestedSamplesCount) - prometheus.MustRegister(purgeDuration) - prometheus.MustRegister(evictionDuration) - - persistQueueCapacity.Set(float64(persistQueueCap)) -} - type storageState uint const ( @@ -139,6 +53,14 @@ type memorySeriesStorage struct { persistQueue chan *persistRequest persistence *persistence + + persistLatency prometheus.Summary + persistErrors *prometheus.CounterVec + persistQueueLength prometheus.Gauge + numSeries prometheus.Gauge + seriesOps *prometheus.CounterVec + ingestedSamplesCount prometheus.Counter + purgeDuration, evictionDuration prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -165,6 +87,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { return nil, err } glog.Infof("%d series loaded.", fingerprintToSeries.length()) + numSeries := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_series", + Help: "The current number of series in memory.", + }) numSeries.Set(float64(fingerprintToSeries.length())) return &memorySeriesStorage{ @@ -182,6 +110,56 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { persistQueue: make(chan *persistRequest, persistQueueCap), persistence: p, + + persistLatency: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_latency_microseconds", + Help: "A summary of latencies for persisting each chunk.", + }), + persistErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_errors_total", + Help: "A counter of errors persisting chunks.", + }, + []string{"error"}, + ), + persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_queue_length", + Help: "The current number of chunks waiting in the persist queue.", + }), + numSeries: numSeries, + seriesOps: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_ops_total", + Help: "The total number of series operations by their type.", + }, + []string{opTypeLabel}, + ), + ingestedSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "ingested_samples_total", + Help: "The total number of samples ingested.", + }), + purgeDuration: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "purge_duration_milliseconds", + Help: "The duration of the last storage purge iteration in milliseconds.", + }), + evictionDuration: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "eviction_duration_milliseconds", + Help: "The duration of the last memory eviction iteration in milliseconds.", + }), }, nil } @@ -196,7 +174,7 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { s.appendSample(sample) } - ingestedSamplesCount.Add(float64(len(samples))) + s.ingestedSamplesCount.Add(float64(len(samples))) } func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { @@ -219,15 +197,15 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err) } if unarchived { - seriesOps.WithLabelValues(unarchive).Inc() + s.seriesOps.WithLabelValues(unarchive).Inc() } else { // This was a genuinely new series, so index the metric. s.persistence.indexMetric(m, fp) - seriesOps.WithLabelValues(create).Inc() + s.seriesOps.WithLabelValues(create).Inc() } series = newMemorySeries(m, !unarchived) s.fingerprintToSeries.put(fp, series) - numSeries.Inc() + s.numSeries.Inc() } return series } @@ -294,7 +272,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { defer func(begin time.Time) { - evictionDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond)) + s.evictionDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond)) }(time.Now()) for m := range s.fingerprintToSeries.iter() { @@ -304,13 +282,13 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { m.fp, s.persistQueue, ) { s.fingerprintToSeries.del(m.fp) - numSeries.Dec() + s.numSeries.Dec() if err := s.persistence.archiveMetric( m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), ); err != nil { glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) } else { - seriesOps.WithLabelValues(archive).Inc() + s.seriesOps.WithLabelValues(archive).Inc() } } s.fpLocker.Unlock(m.fp) @@ -319,12 +297,12 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { func (s *memorySeriesStorage) handlePersistQueue() { for req := range s.persistQueue { - persistQueueLength.Set(float64(len(s.persistQueue))) + s.persistQueueLength.Set(float64(len(s.persistQueue))) start := time.Now() err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) - persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) + s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) if err != nil { - persistErrors.WithLabelValues(err.Error()).Inc() + s.persistErrors.WithLabelValues(err.Error()).Inc() glog.Error("Error persisting chunk, requeuing: ", err) s.persistQueue <- req continue @@ -404,7 +382,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { s.purgeSeries(fp, ts) } } - purgeDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond)) + s.purgeDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond)) glog.Info("Done purging old series data.") } } @@ -427,8 +405,8 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime if series, ok := s.fingerprintToSeries.get(fp); ok { if series.purgeOlderThan(beforeTime) && allDropped { s.fingerprintToSeries.del(fp) - numSeries.Dec() - seriesOps.WithLabelValues(memoryPurge).Inc() + s.numSeries.Dec() + s.seriesOps.WithLabelValues(memoryPurge).Inc() s.persistence.unindexMetric(series.metric, fp) } return @@ -444,7 +422,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime if err := s.persistence.dropArchivedMetric(fp); err != nil { glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) } else { - seriesOps.WithLabelValues(archivePurge).Inc() + s.seriesOps.WithLabelValues(archivePurge).Inc() } } } @@ -573,12 +551,54 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint return metric } +// To expose persistQueueCap as metric: +var ( + persistQueueCapDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"), + "The total capacity of the persist queue.", + nil, nil, + ) + persistQueueCapGauge = prometheus.MustNewConstMetric( + persistQueueCapDesc, prometheus.GaugeValue, persistQueueCap, + ) +) + // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) + + ch <- s.persistLatency.Desc() + s.persistErrors.Describe(ch) + ch <- s.persistQueueLength.Desc() + ch <- s.numSeries.Desc() + s.seriesOps.Describe(ch) + ch <- s.ingestedSamplesCount.Desc() + ch <- s.purgeDuration.Desc() + ch <- s.evictionDuration.Desc() + + ch <- persistQueueCapDesc + + ch <- numMemChunksDesc + ch <- numMemChunkDescsDesc } // Collect implements prometheus.Collector. func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.persistence.Collect(ch) + + ch <- s.persistLatency + s.persistErrors.Collect(ch) + ch <- s.persistQueueLength + ch <- s.numSeries + s.seriesOps.Collect(ch) + ch <- s.ingestedSamplesCount + ch <- s.purgeDuration + ch <- s.evictionDuration + + ch <- persistQueueCapGauge + + count := atomic.LoadInt64(&numMemChunks) + ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count)) + count = atomic.LoadInt64(&numMemChunkDescs) + ch <- prometheus.MustNewConstMetric(numMemChunkDescsDesc, prometheus.GaugeValue, float64(count)) }