diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 0e8dcc669..f1d2d5d4c 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -15,12 +15,172 @@ package local import ( "io" + "sync" clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/storage/metric" ) +// Instrumentation. +// Note that the metrics are often set in bulk by the caller +// for performance reasons. +var ( + chunkOps = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunk_ops_total", + Help: "The total number of chunk operations by their type.", + }, + []string{opTypeLabel}, + ) + chunkDescOps = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkdesc_ops_total", + Help: "The total number of chunk descriptor operations by their type.", + }, + []string{opTypeLabel}, + ) +) + +const ( + // Op-types for chunkOps. + createAndPin = "create" // A chunkDesc creation with refCount=1. + persistAndUnpin = "persist" + pin = "pin" // Excluding the pin on creation. + unpin = "unpin" // Excluding the unpin on persisting. + clone = "clone" + transcode = "transcode" + purge = "purge" + + // Op-types for chunkOps and chunkDescOps. + evict = "evict" + load = "load" +) + +func init() { + prometheus.MustRegister(chunkOps) + prometheus.MustRegister(chunkDescOps) +} + +type chunkDesc struct { + sync.Mutex + chunk chunk + refCount int + evict bool + chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted. + chunkLastTime clientmodel.Timestamp // Used if chunk is evicted. +} + +// newChunkDesc creates a new chunkDesc pointing to the given chunk (may be +// nil). The refCount of the new chunkDesc is 1. +func newChunkDesc(c chunk) *chunkDesc { + chunkOps.WithLabelValues(createAndPin).Inc() + return &chunkDesc{chunk: c, refCount: 1} +} + +func (cd *chunkDesc) add(s *metric.SamplePair) []chunk { + cd.Lock() + defer cd.Unlock() + + return cd.chunk.add(s) +} + +func (cd *chunkDesc) pin() { + cd.Lock() + defer cd.Unlock() + + cd.refCount++ +} + +func (cd *chunkDesc) unpin() { + cd.Lock() + defer cd.Unlock() + + if cd.refCount == 0 { + panic("cannot unpin already unpinned chunk") + } + cd.refCount-- + if cd.refCount == 0 && cd.evict { + cd.evictNow() + } +} + +func (cd *chunkDesc) firstTime() clientmodel.Timestamp { + cd.Lock() + defer cd.Unlock() + + if cd.chunk == nil { + return cd.chunkFirstTime + } + return cd.chunk.firstTime() +} + +func (cd *chunkDesc) lastTime() clientmodel.Timestamp { + cd.Lock() + defer cd.Unlock() + + if cd.chunk == nil { + return cd.chunkLastTime + } + return cd.chunk.lastTime() +} + +func (cd *chunkDesc) isEvicted() bool { + cd.Lock() + defer cd.Unlock() + + return cd.chunk == nil +} + +func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool { + return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) +} + +// setChunk points this chunkDesc to the given chunk. It panics if +// this chunkDesc already has a chunk set. +func (cd *chunkDesc) setChunk(c chunk) { + cd.Lock() + defer cd.Unlock() + + if cd.chunk != nil { + panic("chunk already set") + } + cd.evict = false + cd.chunk = c +} + +// evictOnUnpin evicts the chunk once unpinned. If it is not pinned when this +// method is called, it evicts the chunk immediately and returns true. If the +// chunk is already evicted when this method is called, it returns true, too. +func (cd *chunkDesc) evictOnUnpin() bool { + cd.Lock() + defer cd.Unlock() + + if cd.chunk == nil { + // Already evicted. + return true + } + cd.evict = true + if cd.refCount == 0 { + cd.evictNow() + return true + } + return false +} + +// evictNow is an internal helper method. +func (cd *chunkDesc) evictNow() { + cd.chunkFirstTime = cd.chunk.firstTime() + cd.chunkLastTime = cd.chunk.lastTime() + cd.chunk = nil + chunkOps.WithLabelValues(evict).Inc() +} + // chunk is the interface for all chunks. Chunks are generally not // goroutine-safe. type chunk interface { @@ -62,7 +222,7 @@ type chunkIterator interface { } func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { - numTranscodes.Inc() + chunkOps.WithLabelValues(transcode).Inc() head := dst body := []chunk{} diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go deleted file mode 100644 index 2a276b043..000000000 --- a/storage/local/instrumentation.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2014 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package local - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -const ( - address = "instance" - alive = "alive" - failure = "failure" - outcome = "outcome" - state = "state" - success = "success" - unreachable = "unreachable" -) - -var ( - numSeries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_stored_series_count", - Help: "The number of currently stored series.", - }) - numSamples = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_stored_samples_total", - Help: "The total number of stored samples.", - }) - evictionDuration = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_memory_eviction_duration_milliseconds", - Help: "The duration of the last memory eviction iteration in milliseconds.", - }) - purgeDuration = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_storage_purge_duration_milliseconds", - Help: "The duration of the last storage purge iteration in milliseconds.", - }) - - numTranscodes = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_chunk_transcodes_total", - Help: "The total number of chunk transcodes.", - }) - numPinnedChunks = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_pinned_chunks_count", - Help: "The current number of pinned chunks.", - }) - - persistLatencies = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Name: "prometheus_persist_latency_milliseconds", - Help: "A summary of latencies for persisting each chunk.", - }, []string{outcome}) - persistQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_persist_queue_length", - Help: "The current number of chunks waiting in the persist queue.", - }) - persistQueueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_persist_queue_capacity", - Help: "The total capacity of the persist queue.", - }) -) - -func init() { - prometheus.MustRegister(numSeries) - prometheus.MustRegister(numSamples) - prometheus.MustRegister(evictionDuration) - prometheus.MustRegister(purgeDuration) - prometheus.MustRegister(numTranscodes) - prometheus.MustRegister(numPinnedChunks) - prometheus.MustRegister(persistLatencies) - prometheus.MustRegister(persistQueueLength) - prometheus.MustRegister(persistQueueCapacity) - - persistQueueCapacity.Set(float64(persistQueueCap)) -} diff --git a/storage/local/interface.go b/storage/local/interface.go index 88e70ca07..c1bee10e8 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -21,6 +21,15 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// String constants for instrumentation. +const ( + namespace = "prometheus" + subsystem = "local_storage" + + errorLabel = "error" + opTypeLabel = "type" +) + // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. type Storage interface { diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 1d4ceaaef..80164bc1c 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -34,9 +34,6 @@ import ( ) const ( - namespace = "prometheus" - subsystem = "persistence" - seriesFileSuffix = ".db" seriesTempFileSuffix = ".db.tmp" @@ -258,12 +255,6 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]c defer f.Close() chunks := make([]chunk, 0, len(indexes)) - defer func() { - if err == nil { - return - } - }() - typeBuf := make([]byte, 1) for _, idx := range indexes { _, err := f.Seek(p.offsetForChunkIndex(idx), os.SEEK_SET) @@ -343,6 +334,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie } cds = append(cds, cd) } + chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) return cds, nil } @@ -504,10 +496,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { if err := chunk.unmarshal(r); err != nil { return nil, err } - chunkDescs[i] = &chunkDesc{ - chunk: chunk, - refCount: 1, - } + chunkDescs[i] = newChunkDesc(chunk) } } @@ -546,6 +535,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo if err == io.EOF { // We ran into the end of the file without finding any chunks that should // be kept. Remove the whole file. + chunkOps.WithLabelValues(purge).Add(float64(i)) if err := os.Remove(f.Name()); err != nil { return true, err } @@ -556,6 +546,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo } lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf)) if !lastTime.Before(beforeTime) { + chunkOps.WithLabelValues(purge).Add(float64(i)) break } } diff --git a/storage/local/preload.go b/storage/local/preload.go index b3ba8225a..c5669e6b0 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -106,4 +106,6 @@ func (p *memorySeriesPreloader) Close() { for _, cd := range p.pinnedChunkDescs { cd.unpin() } + chunkOps.WithLabelValues(unpin).Add(float64(len(p.pinnedChunkDescs))) + } diff --git a/storage/local/series.go b/storage/local/series.go index d54f0d8f2..fc17be1f5 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -14,6 +14,7 @@ package local import ( + "math" "sort" "sync" @@ -130,114 +131,6 @@ func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint { return ch } -type chunkDesc struct { - sync.Mutex - chunk chunk - refCount int - evict bool - chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted. - chunkLastTime clientmodel.Timestamp // Used if chunk is evicted. -} - -func (cd *chunkDesc) add(s *metric.SamplePair) []chunk { - cd.Lock() - defer cd.Unlock() - - return cd.chunk.add(s) -} - -func (cd *chunkDesc) pin() { - cd.Lock() - defer cd.Unlock() - - numPinnedChunks.Inc() - cd.refCount++ -} - -func (cd *chunkDesc) unpin() { - cd.Lock() - defer cd.Unlock() - - if cd.refCount == 0 { - panic("cannot unpin already unpinned chunk") - } - numPinnedChunks.Dec() - cd.refCount-- - if cd.refCount == 0 && cd.evict { - cd.evictNow() - } -} - -func (cd *chunkDesc) firstTime() clientmodel.Timestamp { - cd.Lock() - defer cd.Unlock() - - if cd.chunk == nil { - return cd.chunkFirstTime - } - return cd.chunk.firstTime() -} - -func (cd *chunkDesc) lastTime() clientmodel.Timestamp { - cd.Lock() - defer cd.Unlock() - - if cd.chunk == nil { - return cd.chunkLastTime - } - return cd.chunk.lastTime() -} - -func (cd *chunkDesc) isEvicted() bool { - cd.Lock() - defer cd.Unlock() - - return cd.chunk == nil -} - -func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool { - return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) -} - -func (cd *chunkDesc) open(c chunk) { - cd.Lock() - defer cd.Unlock() - - if cd.refCount != 0 || cd.chunk != nil { - panic("cannot open already pinned chunk") - } - cd.evict = false - cd.chunk = c - numPinnedChunks.Inc() - cd.refCount++ -} - -// evictOnUnpin evicts the chunk once unpinned. If it is not pinned when this -// method is called, it evicts the chunk immediately and returns true. If the -// chunk is already evicted when this method is called, it returns true, too. -func (cd *chunkDesc) evictOnUnpin() bool { - cd.Lock() - defer cd.Unlock() - - if cd.chunk == nil { - // Already evicted. - return true - } - cd.evict = true - if cd.refCount == 0 { - cd.evictNow() - return true - } - return false -} - -// evictNow is an internal helper method. -func (cd *chunkDesc) evictNow() { - cd.chunkFirstTime = cd.chunk.firstTime() - cd.chunkLastTime = cd.chunk.lastTime() - cd.chunk = nil -} - type memorySeries struct { metric clientmodel.Metric // Sorted by start time, overlapping chunk ranges are forbidden. @@ -268,10 +161,7 @@ func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries { // The caller must have locked the fingerprint of the series. func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) { if len(s.chunkDescs) == 0 || s.headChunkPersisted { - newHead := &chunkDesc{ - chunk: newDeltaEncodedChunk(d1, d0, true), - refCount: 1, - } + newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true)) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkPersisted = false } @@ -290,10 +180,7 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per queuePersist(s.head()) for i, c := range chunks[1:] { - cd := &chunkDesc{ - chunk: c, - refCount: 1, - } + cd := newChunkDesc(c) s.chunkDescs = append(s.chunkDescs, cd) // The last chunk is still growing. if i < len(chunks[1:])-1 { @@ -344,6 +231,7 @@ func (s *memorySeries) evictOlderThan( lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { s.chunkDescsLoaded = false + chunkDescOps.WithLabelValues(evict).Add(float64(len(s.chunkDescs) - lenToKeep)) s.chunkDescs = append( make([]*chunkDesc, 0, lenToKeep), s.chunkDescs[len(s.chunkDescs)-lenToKeep:]..., @@ -397,7 +285,7 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { for i := 0; i < keepIdx; i++ { s.chunkDescs[i].evictOnUnpin() } - s.chunkDescs = s.chunkDescs[keepIdx:] + s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) return len(s.chunkDescs) == 0 } @@ -406,30 +294,30 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) for _, idx := range indexes { - pinnedChunkDescs = append(pinnedChunkDescs, s.chunkDescs[idx]) - if s.chunkDescs[idx].isEvicted() { + cd := s.chunkDescs[idx] + pinnedChunkDescs = append(pinnedChunkDescs, cd) + cd.pin() // Have to pin everything first to prevent concurrent evictOnUnpin later. + if cd.isEvicted() { loadIndexes = append(loadIndexes, idx) - } else { - s.chunkDescs[idx].pin() } } + chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs))) if len(loadIndexes) > 0 { fp := s.metric.Fingerprint() chunks, err := p.loadChunks(fp, loadIndexes) if err != nil { - // Unpin any pinned chunks that were already loaded. + // Unpin the chunks since we won't return them as pinned chunks now. for _, cd := range pinnedChunkDescs { - if !cd.isEvicted() { - cd.unpin() - } + cd.unpin() } + chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) return nil, err } for i, c := range chunks { - cd := s.chunkDescs[loadIndexes[i]] - cd.open(c) + s.chunkDescs[loadIndexes[i]].setChunk(c) } + chunkOps.WithLabelValues(load).Add(float64(len(chunks))) } return pinnedChunkDescs, nil @@ -472,7 +360,7 @@ func (s *memorySeries) preloadChunksForRange( from clientmodel.Timestamp, through clientmodel.Timestamp, fp clientmodel.Fingerprint, p *persistence, ) ([]*chunkDesc, error) { - firstChunkDescTime := through + firstChunkDescTime := clientmodel.Timestamp(math.MaxInt64) if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() } @@ -516,6 +404,7 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { for i, cd := range s.chunkDescs { if !cd.isEvicted() { if i == len(s.chunkDescs)-1 { + chunkOps.WithLabelValues(clone).Inc() chunks = append(chunks, cd.chunk.clone()) } else { chunks = append(chunks, cd.chunk) diff --git a/storage/local/storage.go b/storage/local/storage.go index eb94fb2d7..a9cc11c68 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -19,14 +19,102 @@ import ( "time" "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/storage/metric" ) const persistQueueCap = 1024 +// Instrumentation. +var ( + persistLatency = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_latency_microseconds", + Help: "A summary of latencies for persisting each chunk.", + }) + persistErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_errors_total", + Help: "A counter of errors persisting chunks.", + }, + []string{errorLabel}, + ) + persistQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_queue_length", + Help: "The current number of chunks waiting in the persist queue.", + }) + persistQueueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persist_queue_capacity", + Help: "The total capacity of the persist queue.", + }) + numSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_series", + Help: "The current number of series in memory.", + }) + seriesOps = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_ops_total", + Help: "The total number of series operations by their type.", + }, + []string{opTypeLabel}, + ) + ingestedSamplesCount = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "ingested_samples_total", + Help: "The total number of samples ingested.", + }) + purgeDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "purge_duration_milliseconds", + Help: "The duration of the last storage purge iteration in milliseconds.", + }) + evictionDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "eviction_duration_milliseconds", + Help: "The duration of the last memory eviction iteration in milliseconds.", + }) +) + +const ( + // Op-types for seriesOps. + create = "create" + archive = "archive" + unarchive = "unarchive" + memoryPurge = "purge_from_memory" + archivePurge = "purge_from_archive" +) + +func init() { + prometheus.MustRegister(persistLatency) + prometheus.MustRegister(persistErrors) + prometheus.MustRegister(persistQueueLength) + prometheus.MustRegister(persistQueueCapacity) + prometheus.MustRegister(numSeries) + prometheus.MustRegister(seriesOps) + prometheus.MustRegister(ingestedSamplesCount) + prometheus.MustRegister(purgeDuration) + prometheus.MustRegister(evictionDuration) + + persistQueueCapacity.Set(float64(persistQueueCap)) +} + type storageState uint const ( @@ -108,7 +196,7 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { s.appendSample(sample) } - numSamples.Add(float64(len(samples))) + ingestedSamplesCount.Add(float64(len(samples))) } func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { @@ -130,14 +218,16 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl if err != nil { glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err) } - if !unarchived { + if unarchived { + seriesOps.WithLabelValues(unarchive).Inc() + } else { // This was a genuinely new series, so index the metric. s.persistence.indexMetric(m, fp) + seriesOps.WithLabelValues(create).Inc() } series = newMemorySeries(m, !unarchived) s.fingerprintToSeries.put(fp, series) - numSeries.Set(float64(s.fingerprintToSeries.length())) - + numSeries.Inc() } return series } @@ -204,7 +294,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { defer func(begin time.Time) { - evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) + evictionDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond)) }(time.Now()) for m := range s.fingerprintToSeries.iter() { @@ -214,38 +304,33 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { m.fp, s.persistQueue, ) { s.fingerprintToSeries.del(m.fp) + numSeries.Dec() if err := s.persistence.archiveMetric( m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), ); err != nil { glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) + } else { + seriesOps.WithLabelValues(archive).Inc() } } s.fpLocker.Unlock(m.fp) } } -func recordPersist(start time.Time, err error) { - outcome := success - if err != nil { - outcome = failure - } - persistLatencies.WithLabelValues(outcome).Observe(float64(time.Since(start) / time.Millisecond)) -} - func (s *memorySeriesStorage) handlePersistQueue() { for req := range s.persistQueue { persistQueueLength.Set(float64(len(s.persistQueue))) - - //glog.Info("Persist request: ", *req.fingerprint) start := time.Now() err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) - recordPersist(start, err) + persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) if err != nil { + persistErrors.WithLabelValues(err.Error()).Inc() glog.Error("Error persisting chunk, requeuing: ", err) s.persistQueue <- req continue } req.chunkDesc.unpin() + chunkOps.WithLabelValues(persistAndUnpin).Inc() } s.persistDone <- true } @@ -319,7 +404,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { s.purgeSeries(fp, ts) } } - purgeDuration.Set(float64(time.Since(begin) / time.Millisecond)) + purgeDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond)) glog.Info("Done purging old series data.") } } @@ -342,6 +427,8 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime if series, ok := s.fingerprintToSeries.get(fp); ok { if series.purgeOlderThan(beforeTime) && allDropped { s.fingerprintToSeries.del(fp) + numSeries.Dec() + seriesOps.WithLabelValues(memoryPurge).Inc() s.persistence.unindexMetric(series.metric, fp) } return @@ -349,13 +436,15 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime // If we arrive here, nothing was in memory, so the metric must have // been archived. Drop the archived metric if there are no persisted - // chunks left. If we do drop the archived metric, we should update the - // archivedFingerprintToTimeRange index according to the remaining + // chunks left. If we don't drop the archived metric, we should update + // the archivedFingerprintToTimeRange index according to the remaining // chunks, but it's probably not worth the effort. Queries going beyond // the purge cut-off can be truncated in a more direct fashion. if allDropped { if err := s.persistence.dropArchivedMetric(fp); err != nil { glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) + } else { + seriesOps.WithLabelValues(archivePurge).Inc() } } } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 7edc602a7..31acfeddf 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -35,7 +35,7 @@ const ( // String constants for instrumentation. const ( namespace = "prometheus" - subsystem = "remote_tsdb" + subsystem = "remote_storage" result = "result" success = "success"