From e87449b59da42c69c3b8b52df6147160f33977b3 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 23 Apr 2019 01:49:17 -0700 Subject: [PATCH] Remote Write: Queue Manager specific metrics shouldn't exist if the queue no longer exists (#5445) * Unregister remote write queue manager specific metrics when stopping the queue manager. * Use DeleteLabelValues instead of Unregister to remove queue and watcher related metrics when we stop them. Create those metrics in the structs start functions rather than in their constructors because of the ordering of creation, start, and stop in remote storage ApplyConfig. * Add setMetrics function to WAL watcher so we can set the watchers metrics in it's Start function, but not have to call Start in some tests (causes data race). Signed-off-by: Callum Styan --- storage/remote/queue_manager.go | 50 ++++++++++++++++++++---------- storage/remote/wal_watcher.go | 25 ++++++++++++--- storage/remote/wal_watcher_test.go | 10 +++--- 3 files changed, 60 insertions(+), 25 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 9dd50ee601..0a9dff82df 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -189,6 +189,7 @@ type QueueManager struct { sentBatchDuration prometheus.Observer succeededSamplesTotal prometheus.Counter retriedSamplesTotal prometheus.Counter + shardCapacity prometheus.Gauge } // NewQueueManager builds a new QueueManager. @@ -219,27 +220,11 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), - - highestSentTimestampMetric: &maxGauge{ - Gauge: queueHighestSentTimestamp.WithLabelValues(name), - }, - pendingSamplesMetric: queuePendingSamples.WithLabelValues(name), - enqueueRetriesMetric: enqueueRetriesTotal.WithLabelValues(name), - droppedSamplesTotal: droppedSamplesTotal.WithLabelValues(name), - numShardsMetric: numShards.WithLabelValues(name), - failedSamplesTotal: failedSamplesTotal.WithLabelValues(name), - sentBatchDuration: sentBatchDuration.WithLabelValues(name), - succeededSamplesTotal: succeededSamplesTotal.WithLabelValues(name), - retriedSamplesTotal: retriedSamplesTotal.WithLabelValues(name), } t.watcher = NewWALWatcher(logger, name, t, walDir) t.shards = t.newShards() - // Initialise some metrics. - shardCapacity.WithLabelValues(name).Set(float64(t.cfg.Capacity)) - t.pendingSamplesMetric.Set(0) - return t } @@ -307,6 +292,27 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + // Setup the QueueManagers metrics. We do this here rather than in the + // constructor because of the ordering of creating Queue Managers's, stopping them, + // and then starting new ones in storage/remote/storage.go ApplyConfig. + name := t.client.Name() + t.highestSentTimestampMetric = &maxGauge{ + Gauge: queueHighestSentTimestamp.WithLabelValues(name), + } + t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name) + t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name) + t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name) + t.numShardsMetric = numShards.WithLabelValues(name) + t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name) + t.sentBatchDuration = sentBatchDuration.WithLabelValues(name) + t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name) + t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name) + t.shardCapacity = shardCapacity.WithLabelValues(name) + + // Initialise some metrics. + t.shardCapacity.Set(float64(t.cfg.Capacity)) + t.pendingSamplesMetric.Set(0) + t.shards.start(t.numShards) t.watcher.Start() @@ -335,6 +341,18 @@ func (t *QueueManager) Stop() { for _, labels := range t.seriesLabels { release(labels) } + // Delete metrics so we don't have alerts for queues that are gone. + name := t.client.Name() + queueHighestSentTimestamp.DeleteLabelValues(name) + queuePendingSamples.DeleteLabelValues(name) + enqueueRetriesTotal.DeleteLabelValues(name) + droppedSamplesTotal.DeleteLabelValues(name) + numShards.DeleteLabelValues(name) + failedSamplesTotal.DeleteLabelValues(name) + sentBatchDuration.DeleteLabelValues(name) + succeededSamplesTotal.DeleteLabelValues(name) + retriedSamplesTotal.DeleteLabelValues(name) + shardCapacity.DeleteLabelValues(name) } // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 4c58264158..3dba373b48 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -128,18 +128,25 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string quit: make(chan struct{}), done: make(chan struct{}), - recordsReadMetric: watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: name}), - recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name), - samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name), - currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name), - maxSegment: -1, } } +func (w *WALWatcher) setMetrics() { + // Setup the WAL Watchers metrics. We do this here rather than in the + // constructor because of the ordering of creating Queue Managers's, + // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. + w.recordsReadMetric = watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: w.name}) + w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name) + w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name) + w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name) +} + // Start the WALWatcher. func (w *WALWatcher) Start() { + w.setMetrics() level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) + go w.loop() } @@ -147,6 +154,14 @@ func (w *WALWatcher) Start() { func (w *WALWatcher) Stop() { close(w.quit) <-w.done + + // Records read metric has series and samples. + watcherRecordsRead.DeleteLabelValues(w.name, "series") + watcherRecordsRead.DeleteLabelValues(w.name, "samples") + watcherRecordDecodeFails.DeleteLabelValues(w.name) + watcherSamplesSentPreTailing.DeleteLabelValues(w.name) + watcherCurrentSegment.DeleteLabelValues(w.name) + level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index a802a76462..31f27d3807 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -102,8 +102,6 @@ func TestTailSamples(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - // os.Create(wal.SegmentName(wdir, 30)) - enc := tsdb.RecordEncoder{} w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) testutil.Ok(t, err) @@ -139,6 +137,9 @@ func TestTailSamples(t *testing.T) { wt := newWriteToMock() watcher := NewWALWatcher(nil, "", wt, dir) watcher.startTime = now.UnixNano() + + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() for i := first; i <= last; i++ { segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) testutil.Ok(t, err) @@ -148,14 +149,12 @@ func TestTailSamples(t *testing.T) { // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } - go watcher.Start() expectedSeries := seriesCount expectedSamples := seriesCount * samplesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expectedSeries }) - watcher.Stop() testutil.Equals(t, expectedSeries, wt.checkNumLabels()) testutil.Equals(t, expectedSamples, wt.samplesAppended) } @@ -424,6 +423,9 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { watcher := NewWALWatcher(nil, "", wt, dir) watcher.maxSegment = -1 + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() + lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) testutil.Ok(t, err)