diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e6c1e0ac89..fbdd28d8d0 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -46,152 +46,134 @@ const ( ) type queueManagerMetrics struct { - succeededSamplesTotal *prometheus.CounterVec - failedSamplesTotal *prometheus.CounterVec - retriedSamplesTotal *prometheus.CounterVec - droppedSamplesTotal *prometheus.CounterVec - enqueueRetriesTotal *prometheus.CounterVec - sentBatchDuration *prometheus.HistogramVec - queueHighestSentTimestamp *prometheus.GaugeVec - queuePendingSamples *prometheus.GaugeVec - shardCapacity *prometheus.GaugeVec - numShards *prometheus.GaugeVec - maxNumShards *prometheus.GaugeVec - minNumShards *prometheus.GaugeVec - desiredNumShards *prometheus.GaugeVec - bytesSent *prometheus.CounterVec + reg prometheus.Registerer + + succeededSamplesTotal prometheus.Counter + failedSamplesTotal prometheus.Counter + retriedSamplesTotal prometheus.Counter + droppedSamplesTotal prometheus.Counter + enqueueRetriesTotal prometheus.Counter + sentBatchDuration prometheus.Histogram + highestSentTimestamp *maxGauge + pendingSamples prometheus.Gauge + shardCapacity prometheus.Gauge + numShards prometheus.Gauge + maxNumShards prometheus.Gauge + minNumShards prometheus.Gauge + desiredNumShards prometheus.Gauge + bytesSent prometheus.Counter } -func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics { - m := &queueManagerMetrics{} +func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics { + m := &queueManagerMetrics{ + reg: r, + } + constLabels := prometheus.Labels{ + remoteName: rn, + endpoint: e, + } - m.succeededSamplesTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "succeeded_samples_total", - Help: "Total number of samples successfully sent to remote storage.", - }, - []string{remoteName, endpoint}, - ) - m.failedSamplesTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "failed_samples_total", - Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", - }, - []string{remoteName, endpoint}, - ) - m.retriedSamplesTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "retried_samples_total", - Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", - }, - []string{remoteName, endpoint}, - ) - m.droppedSamplesTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "dropped_samples_total", - Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", - }, - []string{remoteName, endpoint}, - ) - m.enqueueRetriesTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "enqueue_retries_total", - Help: "Total number of times enqueue has failed because a shards queue was full.", - }, - []string{remoteName, endpoint}, - ) - m.sentBatchDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_batch_duration_seconds", - Help: "Duration of sample batch send calls to the remote storage.", - Buckets: prometheus.DefBuckets, - }, - []string{remoteName, endpoint}, - ) - m.queueHighestSentTimestamp = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queue_highest_sent_timestamp_seconds", - Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.", - }, - []string{remoteName, endpoint}, - ) - m.queuePendingSamples = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "pending_samples", - Help: "The number of samples pending in the queues shards to be sent to the remote storage.", - }, - []string{remoteName, endpoint}, - ) - m.shardCapacity = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "shard_capacity", - Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.", - }, - []string{remoteName, endpoint}, - ) - m.numShards = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "shards", - Help: "The number of shards used for parallel sending to the remote storage.", - }, - []string{remoteName, endpoint}, - ) - m.maxNumShards = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "shards_max", - Help: "The maximum number of shards that the queue is allowed to run.", - }, - []string{remoteName, endpoint}, - ) - m.minNumShards = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "shards_min", - Help: "The minimum number of shards that the queue is allowed to run.", - }, - []string{remoteName, endpoint}, - ) - m.desiredNumShards = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "shards_desired", - Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.", - }, - []string{remoteName, endpoint}, - ) - m.bytesSent = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_bytes_total", - Help: "The total number of bytes sent by the queue.", - }, - []string{remoteName, endpoint}, - ) + m.succeededSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "succeeded_samples_total", + Help: "Total number of samples successfully sent to remote storage.", + ConstLabels: constLabels, + }) + m.failedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_samples_total", + Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", + ConstLabels: constLabels, + }) + m.retriedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "retried_samples_total", + Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", + ConstLabels: constLabels, + }) + m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dropped_samples_total", + Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", + ConstLabels: constLabels, + }) + m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "enqueue_retries_total", + Help: "Total number of times enqueue has failed because a shards queue was full.", + ConstLabels: constLabels, + }) + m.sentBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + ConstLabels: constLabels, + }) + m.highestSentTimestamp = &maxGauge{ + Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_highest_sent_timestamp_seconds", + Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.", + ConstLabels: constLabels, + }), + } + m.pendingSamples = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "pending_samples", + Help: "The number of samples pending in the queues shards to be sent to the remote storage.", + ConstLabels: constLabels, + }) + m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shard_capacity", + Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.", + ConstLabels: constLabels, + }) + m.numShards = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards", + Help: "The number of shards used for parallel sending to the remote storage.", + ConstLabels: constLabels, + }) + m.maxNumShards = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards_max", + Help: "The maximum number of shards that the queue is allowed to run.", + ConstLabels: constLabels, + }) + m.minNumShards = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards_min", + Help: "The minimum number of shards that the queue is allowed to run.", + ConstLabels: constLabels, + }) + m.desiredNumShards = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards_desired", + Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.", + ConstLabels: constLabels, + }) + m.bytesSent = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_bytes_total", + Help: "The total number of bytes sent by the queue.", + ConstLabels: constLabels, + }) if r != nil { r.MustRegister( @@ -201,8 +183,8 @@ func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics { m.droppedSamplesTotal, m.enqueueRetriesTotal, m.sentBatchDuration, - m.queueHighestSentTimestamp, - m.queuePendingSamples, + m.highestSentTimestamp, + m.pendingSamples, m.shardCapacity, m.numShards, m.maxNumShards, @@ -214,6 +196,25 @@ func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics { return m } +func (m *queueManagerMetrics) unregister() { + if m.reg != nil { + m.reg.Unregister(m.succeededSamplesTotal) + m.reg.Unregister(m.failedSamplesTotal) + m.reg.Unregister(m.retriedSamplesTotal) + m.reg.Unregister(m.droppedSamplesTotal) + m.reg.Unregister(m.enqueueRetriesTotal) + m.reg.Unregister(m.sentBatchDuration) + m.reg.Unregister(m.highestSentTimestamp) + m.reg.Unregister(m.pendingSamples) + m.reg.Unregister(m.shardCapacity) + m.reg.Unregister(m.numShards) + m.reg.Unregister(m.maxNumShards) + m.reg.Unregister(m.minNumShards) + m.reg.Unregister(m.desiredNumShards) + m.reg.Unregister(m.bytesSent) + } +} + // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { @@ -255,25 +256,23 @@ type QueueManager struct { samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate - metrics *queueManagerMetrics - highestSentTimestampMetric *maxGauge - pendingSamplesMetric prometheus.Gauge - enqueueRetriesMetric prometheus.Counter - droppedSamplesTotal prometheus.Counter - numShardsMetric prometheus.Gauge - failedSamplesTotal prometheus.Counter - sentBatchDuration prometheus.Observer - succeededSamplesTotal prometheus.Counter - retriedSamplesTotal prometheus.Counter - shardCapacity prometheus.Gauge - maxNumShards prometheus.Gauge - minNumShards prometheus.Gauge - desiredNumShards prometheus.Gauge - bytesSent prometheus.Counter + metrics *queueManagerMetrics } // NewQueueManager builds a new QueueManager. -func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager( + metrics *queueManagerMetrics, + watcherMetrics *wal.WatcherMetrics, + readerMetrics *wal.LiveReaderMetrics, + logger log.Logger, + walDir string, + samplesIn *ewmaRate, + cfg config.QueueConfig, + externalLabels labels.Labels, + relabelConfigs []*relabel.Config, + client StorageClient, + flushDeadline time.Duration, +) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -317,7 +316,7 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { - t.droppedSamplesTotal.Inc() + t.metrics.droppedSamplesTotal.Inc() t.samplesDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) @@ -343,7 +342,7 @@ outer: continue outer } - t.enqueueRetriesMetric.Inc() + t.metrics.enqueueRetriesTotal.Inc() time.Sleep(time.Duration(backoff)) backoff = backoff * 2 if backoff > t.cfg.MaxBackoff { @@ -357,34 +356,12 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - // Setup the QueueManagers metrics. We do this here rather than in the - // constructor because of the ordering of creating Queue Managers's, stopping them, - // and then starting new ones in storage/remote/storage.go ApplyConfig. - name := t.client().Name() - ep := t.client().Endpoint() - t.highestSentTimestampMetric = &maxGauge{ - Gauge: t.metrics.queueHighestSentTimestamp.WithLabelValues(name, ep), - } - t.pendingSamplesMetric = t.metrics.queuePendingSamples.WithLabelValues(name, ep) - t.enqueueRetriesMetric = t.metrics.enqueueRetriesTotal.WithLabelValues(name, ep) - t.droppedSamplesTotal = t.metrics.droppedSamplesTotal.WithLabelValues(name, ep) - t.numShardsMetric = t.metrics.numShards.WithLabelValues(name, ep) - t.failedSamplesTotal = t.metrics.failedSamplesTotal.WithLabelValues(name, ep) - t.sentBatchDuration = t.metrics.sentBatchDuration.WithLabelValues(name, ep) - t.succeededSamplesTotal = t.metrics.succeededSamplesTotal.WithLabelValues(name, ep) - t.retriedSamplesTotal = t.metrics.retriedSamplesTotal.WithLabelValues(name, ep) - t.shardCapacity = t.metrics.shardCapacity.WithLabelValues(name, ep) - t.maxNumShards = t.metrics.maxNumShards.WithLabelValues(name, ep) - t.minNumShards = t.metrics.minNumShards.WithLabelValues(name, ep) - t.desiredNumShards = t.metrics.desiredNumShards.WithLabelValues(name, ep) - t.bytesSent = t.metrics.bytesSent.WithLabelValues(name, ep) - // Initialise some metrics. - t.shardCapacity.Set(float64(t.cfg.Capacity)) - t.pendingSamplesMetric.Set(0) - t.maxNumShards.Set(float64(t.cfg.MaxShards)) - t.minNumShards.Set(float64(t.cfg.MinShards)) - t.desiredNumShards.Set(float64(t.cfg.MinShards)) + t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) + t.metrics.pendingSamples.Set(0) + t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards)) + t.metrics.minNumShards.Set(float64(t.cfg.MinShards)) + t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards)) t.shards.start(t.numShards) t.watcher.Start() @@ -414,22 +391,7 @@ func (t *QueueManager) Stop() { releaseLabels(labels) } t.seriesMtx.Unlock() - // Delete metrics so we don't have alerts for queues that are gone. - name := t.client().Name() - ep := t.client().Endpoint() - t.metrics.queueHighestSentTimestamp.DeleteLabelValues(name, ep) - t.metrics.queuePendingSamples.DeleteLabelValues(name, ep) - t.metrics.enqueueRetriesTotal.DeleteLabelValues(name, ep) - t.metrics.droppedSamplesTotal.DeleteLabelValues(name, ep) - t.metrics.numShards.DeleteLabelValues(name, ep) - t.metrics.failedSamplesTotal.DeleteLabelValues(name, ep) - t.metrics.sentBatchDuration.DeleteLabelValues(name, ep) - t.metrics.succeededSamplesTotal.DeleteLabelValues(name, ep) - t.metrics.retriedSamplesTotal.DeleteLabelValues(name, ep) - t.metrics.shardCapacity.DeleteLabelValues(name, ep) - t.metrics.maxNumShards.DeleteLabelValues(name, ep) - t.metrics.minNumShards.DeleteLabelValues(name, ep) - t.metrics.desiredNumShards.DeleteLabelValues(name, ep) + t.metrics.unregister() } // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. @@ -597,7 +559,7 @@ func (t *QueueManager) calculateDesiredShards() int { samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate) samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate - highestSent = t.highestSentTimestampMetric.Get() + highestSent = t.metrics.highestSentTimestamp.Get() highestRecv = highestTimestamp.Get() delay = highestRecv - highestSent samplesPending = delay * samplesInRate * samplesKeptRatio @@ -616,7 +578,7 @@ func (t *QueueManager) calculateDesiredShards() int { timePerSample = samplesOutDuration / samplesOutRate desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) ) - t.desiredNumShards.Set(desiredShards) + t.metrics.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", "samplesInRate", samplesInRate, "samplesOutRate", samplesOutRate, @@ -726,7 +688,7 @@ func (s *shards) start(n int) { for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } - s.qm.numShardsMetric.Set(float64(n)) + s.qm.metrics.numShards.Set(float64(n)) } // stop the shards; subsequent call to enqueue will return false. @@ -820,7 +782,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { if nPending > 0 { level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending) s.sendSamples(ctx, pendingSamples[:nPending], &buf) - s.qm.pendingSamplesMetric.Sub(float64(nPending)) + s.qm.metrics.pendingSamples.Sub(float64(nPending)) level.Debug(s.qm.logger).Log("msg", "Done flushing.") } return @@ -833,12 +795,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { pendingSamples[nPending].Samples[0].Timestamp = sample.t pendingSamples[nPending].Samples[0].Value = sample.v nPending++ - s.qm.pendingSamplesMetric.Inc() + s.qm.metrics.pendingSamples.Inc() if nPending >= max { s.sendSamples(ctx, pendingSamples, &buf) nPending = 0 - s.qm.pendingSamplesMetric.Sub(float64(max)) + s.qm.metrics.pendingSamples.Sub(float64(max)) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -848,7 +810,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { if nPending > 0 { level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum) s.sendSamples(ctx, pendingSamples[:nPending], &buf) - s.qm.pendingSamplesMetric.Sub(float64(nPending)) + s.qm.metrics.pendingSamples.Sub(float64(nPending)) nPending = 0 } timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -861,7 +823,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b err := s.sendSamplesWithBackoff(ctx, samples, buf) if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) - s.qm.failedSamplesTotal.Add(float64(len(samples))) + s.qm.metrics.failedSamplesTotal.Add(float64(len(samples))) } // These counters are used to calculate the dynamic sharding, and as such @@ -891,19 +853,19 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti begin := time.Now() err := s.qm.client().Store(ctx, req) - s.qm.sentBatchDuration.Observe(time.Since(begin).Seconds()) + s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err == nil { - s.qm.succeededSamplesTotal.Add(float64(len(samples))) - s.qm.bytesSent.Add(float64(len(req))) - s.qm.highestSentTimestampMetric.Set(float64(highest / 1000)) + s.qm.metrics.succeededSamplesTotal.Add(float64(len(samples))) + s.qm.metrics.bytesSent.Add(float64(len(req))) + s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) return nil } if _, ok := err.(recoverableError); !ok { return err } - s.qm.retriedSamplesTotal.Add(float64(len(samples))) + s.qm.metrics.retriedSamplesTotal.Add(float64(len(samples))) level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err) time.Sleep(time.Duration(backoff)) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 78c7fa3bee..323ee7e5e3 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -22,6 +22,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -31,6 +32,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -60,7 +62,7 @@ func TestSampleDelivery(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) @@ -89,7 +91,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -130,7 +132,7 @@ func TestSampleDeliveryOrder(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) @@ -149,7 +151,7 @@ func TestShutdown(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend @@ -188,7 +190,7 @@ func TestSeriesReset(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} @@ -218,7 +220,7 @@ func TestReshard(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) @@ -251,7 +253,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() h.Unlock() @@ -268,7 +270,7 @@ func TestReshardRaceWithStop(t *testing.T) { } func TestReleaseNoninternedString(t *testing.T) { - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") c := NewTestStorageClient() m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() @@ -316,7 +318,7 @@ func TestShouldReshard(t *testing.T) { }, } for _, c := range cases { - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") client := NewTestStorageClient() m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m.numShards = c.startingShards @@ -524,7 +526,7 @@ func BenchmarkSampleDelivery(b *testing.B) { testutil.Ok(b, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) @@ -565,7 +567,7 @@ func BenchmarkStartup(b *testing.B) { logger = log.With(logger, "caller", log.DefaultCaller) for n := 0; n < b.N; n++ { - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") c := NewTestBlockedStorageClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), @@ -616,7 +618,7 @@ func TestCalculateDesiredShards(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - metrics := newQueueManagerMetrics(nil) + metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) @@ -649,7 +651,7 @@ func TestCalculateDesiredShards(t *testing.T) { // highest sent is how far back pending samples would be at our input rate. highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) - m.highestSentTimestampMetric.Set(float64(highestSent.Unix())) + m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix())) atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix()) } @@ -685,3 +687,19 @@ func TestCalculateDesiredShards(t *testing.T) { } testutil.Assert(t, pendingSamples == 0, "Remote write never caught up, there are still %d pending samples.", pendingSamples) } + +func TestQueueManagerMetrics(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234") + + // Make sure metrics pass linting. + problems, err := client_testutil.GatherAndLint(reg) + testutil.Ok(t, err) + testutil.Equals(t, 0, len(problems), "Metric linting problems detected: %v", problems) + + // Make sure all metrics were unregistered. A failure here means you need + // unregister a metric in `queueManagerMetrics.unregister()`. + metrics.unregister() + err = client_testutil.GatherAndCompare(reg, strings.NewReader("")) + testutil.Ok(t, err) +} diff --git a/storage/remote/write.go b/storage/remote/write.go index 57df73bbb6..c712397363 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -47,9 +47,9 @@ var ( // WriteStorage represents all the remote write storage. type WriteStorage struct { logger log.Logger + reg prometheus.Registerer mtx sync.Mutex - queueMetrics *queueManagerMetrics watcherMetrics *wal.WatcherMetrics liveReaderMetrics *wal.LiveReaderMetrics externalLabels labels.Labels @@ -66,10 +66,10 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string } rws := &WriteStorage{ queues: make(map[string]*QueueManager), - queueMetrics: newQueueManagerMetrics(reg), watcherMetrics: wal.NewWatcherMetrics(reg), liveReaderMetrics: wal.NewLiveReaderMetrics(reg), logger: logger, + reg: reg, flushDeadline: flushDeadline, samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), walDir: walDir, @@ -136,8 +136,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { continue } + endpoint := rwConf.URL.String() newQueues[hash] = NewQueueManager( - rws.queueMetrics, + newQueueManagerMetrics(rws.reg, name, endpoint), rws.watcherMetrics, rws.liveReaderMetrics, rws.logger,