Remove duplicate metrics in QueueManager

Right now any new metrics added for remote write need to be added to
both the QueueManager struct, and the queueManagerMetrics struct.
Instead, use the queueManagerMetrics struct directly from QueueManager.

The newQueueManagerMetrics constructor will now create the metrics for a
specific queue with name and endpoint pre-populated, and a new copy of
the struct will be created specifically for each queue.

This also fixes a bug where prometheus_remote_storage_sent_bytes_total
is not being unregistered after a queue is changed.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2020-04-25 03:39:46 +00:00
parent 532f7bbac9
commit dfad1da296
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A
3 changed files with 198 additions and 235 deletions

View file

@ -46,152 +46,134 @@ const (
) )
type queueManagerMetrics struct { type queueManagerMetrics struct {
succeededSamplesTotal *prometheus.CounterVec reg prometheus.Registerer
failedSamplesTotal *prometheus.CounterVec
retriedSamplesTotal *prometheus.CounterVec succeededSamplesTotal prometheus.Counter
droppedSamplesTotal *prometheus.CounterVec failedSamplesTotal prometheus.Counter
enqueueRetriesTotal *prometheus.CounterVec retriedSamplesTotal prometheus.Counter
sentBatchDuration *prometheus.HistogramVec droppedSamplesTotal prometheus.Counter
queueHighestSentTimestamp *prometheus.GaugeVec enqueueRetriesTotal prometheus.Counter
queuePendingSamples *prometheus.GaugeVec sentBatchDuration prometheus.Histogram
shardCapacity *prometheus.GaugeVec highestSentTimestamp *maxGauge
numShards *prometheus.GaugeVec pendingSamples prometheus.Gauge
maxNumShards *prometheus.GaugeVec shardCapacity prometheus.Gauge
minNumShards *prometheus.GaugeVec numShards prometheus.Gauge
desiredNumShards *prometheus.GaugeVec maxNumShards prometheus.Gauge
bytesSent *prometheus.CounterVec minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
bytesSent prometheus.Counter
} }
func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics { func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics {
m := &queueManagerMetrics{} m := &queueManagerMetrics{
reg: r,
}
constLabels := prometheus.Labels{
remoteName: rn,
endpoint: e,
}
m.succeededSamplesTotal = prometheus.NewCounterVec( m.succeededSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
prometheus.CounterOpts{ Namespace: namespace,
Namespace: namespace, Subsystem: subsystem,
Subsystem: subsystem, Name: "succeeded_samples_total",
Name: "succeeded_samples_total", Help: "Total number of samples successfully sent to remote storage.",
Help: "Total number of samples successfully sent to remote storage.", ConstLabels: constLabels,
}, })
[]string{remoteName, endpoint}, m.failedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
) Namespace: namespace,
m.failedSamplesTotal = prometheus.NewCounterVec( Subsystem: subsystem,
prometheus.CounterOpts{ Name: "failed_samples_total",
Namespace: namespace, Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.",
Subsystem: subsystem, ConstLabels: constLabels,
Name: "failed_samples_total", })
Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", m.retriedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
}, Namespace: namespace,
[]string{remoteName, endpoint}, Subsystem: subsystem,
) Name: "retried_samples_total",
m.retriedSamplesTotal = prometheus.NewCounterVec( Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.",
prometheus.CounterOpts{ ConstLabels: constLabels,
Namespace: namespace, })
Subsystem: subsystem, m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "retried_samples_total", Namespace: namespace,
Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", Subsystem: subsystem,
}, Name: "dropped_samples_total",
[]string{remoteName, endpoint}, Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.",
) ConstLabels: constLabels,
m.droppedSamplesTotal = prometheus.NewCounterVec( })
prometheus.CounterOpts{ m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "dropped_samples_total", Name: "enqueue_retries_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", Help: "Total number of times enqueue has failed because a shards queue was full.",
}, ConstLabels: constLabels,
[]string{remoteName, endpoint}, })
) m.sentBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
m.enqueueRetriesTotal = prometheus.NewCounterVec( Namespace: namespace,
prometheus.CounterOpts{ Subsystem: subsystem,
Namespace: namespace, Name: "sent_batch_duration_seconds",
Subsystem: subsystem, Help: "Duration of sample batch send calls to the remote storage.",
Name: "enqueue_retries_total", Buckets: prometheus.DefBuckets,
Help: "Total number of times enqueue has failed because a shards queue was full.", ConstLabels: constLabels,
}, })
[]string{remoteName, endpoint}, m.highestSentTimestamp = &maxGauge{
) Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
m.sentBatchDuration = prometheus.NewHistogramVec( Namespace: namespace,
prometheus.HistogramOpts{ Subsystem: subsystem,
Namespace: namespace, Name: "queue_highest_sent_timestamp_seconds",
Subsystem: subsystem, Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.",
Name: "sent_batch_duration_seconds", ConstLabels: constLabels,
Help: "Duration of sample batch send calls to the remote storage.", }),
Buckets: prometheus.DefBuckets, }
}, m.pendingSamples = prometheus.NewGauge(prometheus.GaugeOpts{
[]string{remoteName, endpoint}, Namespace: namespace,
) Subsystem: subsystem,
m.queueHighestSentTimestamp = prometheus.NewGaugeVec( Name: "pending_samples",
prometheus.GaugeOpts{ Help: "The number of samples pending in the queues shards to be sent to the remote storage.",
Namespace: namespace, ConstLabels: constLabels,
Subsystem: subsystem, })
Name: "queue_highest_sent_timestamp_seconds", m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.", Namespace: namespace,
}, Subsystem: subsystem,
[]string{remoteName, endpoint}, Name: "shard_capacity",
) Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.",
m.queuePendingSamples = prometheus.NewGaugeVec( ConstLabels: constLabels,
prometheus.GaugeOpts{ })
Namespace: namespace, m.numShards = prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: subsystem, Namespace: namespace,
Name: "pending_samples", Subsystem: subsystem,
Help: "The number of samples pending in the queues shards to be sent to the remote storage.", Name: "shards",
}, Help: "The number of shards used for parallel sending to the remote storage.",
[]string{remoteName, endpoint}, ConstLabels: constLabels,
) })
m.shardCapacity = prometheus.NewGaugeVec( m.maxNumShards = prometheus.NewGauge(prometheus.GaugeOpts{
prometheus.GaugeOpts{ Namespace: namespace,
Namespace: namespace, Subsystem: subsystem,
Subsystem: subsystem, Name: "shards_max",
Name: "shard_capacity", Help: "The maximum number of shards that the queue is allowed to run.",
Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.", ConstLabels: constLabels,
}, })
[]string{remoteName, endpoint}, m.minNumShards = prometheus.NewGauge(prometheus.GaugeOpts{
) Namespace: namespace,
m.numShards = prometheus.NewGaugeVec( Subsystem: subsystem,
prometheus.GaugeOpts{ Name: "shards_min",
Namespace: namespace, Help: "The minimum number of shards that the queue is allowed to run.",
Subsystem: subsystem, ConstLabels: constLabels,
Name: "shards", })
Help: "The number of shards used for parallel sending to the remote storage.", m.desiredNumShards = prometheus.NewGauge(prometheus.GaugeOpts{
}, Namespace: namespace,
[]string{remoteName, endpoint}, Subsystem: subsystem,
) Name: "shards_desired",
m.maxNumShards = prometheus.NewGaugeVec( Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.",
prometheus.GaugeOpts{ ConstLabels: constLabels,
Namespace: namespace, })
Subsystem: subsystem, m.bytesSent = prometheus.NewCounter(prometheus.CounterOpts{
Name: "shards_max", Namespace: namespace,
Help: "The maximum number of shards that the queue is allowed to run.", Subsystem: subsystem,
}, Name: "sent_bytes_total",
[]string{remoteName, endpoint}, Help: "The total number of bytes sent by the queue.",
) ConstLabels: constLabels,
m.minNumShards = prometheus.NewGaugeVec( })
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "shards_min",
Help: "The minimum number of shards that the queue is allowed to run.",
},
[]string{remoteName, endpoint},
)
m.desiredNumShards = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "shards_desired",
Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.",
},
[]string{remoteName, endpoint},
)
m.bytesSent = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_bytes_total",
Help: "The total number of bytes sent by the queue.",
},
[]string{remoteName, endpoint},
)
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
@ -201,8 +183,8 @@ func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics {
m.droppedSamplesTotal, m.droppedSamplesTotal,
m.enqueueRetriesTotal, m.enqueueRetriesTotal,
m.sentBatchDuration, m.sentBatchDuration,
m.queueHighestSentTimestamp, m.highestSentTimestamp,
m.queuePendingSamples, m.pendingSamples,
m.shardCapacity, m.shardCapacity,
m.numShards, m.numShards,
m.maxNumShards, m.maxNumShards,
@ -214,6 +196,25 @@ func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics {
return m return m
} }
func (m *queueManagerMetrics) unregister() {
if m.reg != nil {
m.reg.Unregister(m.succeededSamplesTotal)
m.reg.Unregister(m.failedSamplesTotal)
m.reg.Unregister(m.retriedSamplesTotal)
m.reg.Unregister(m.droppedSamplesTotal)
m.reg.Unregister(m.enqueueRetriesTotal)
m.reg.Unregister(m.sentBatchDuration)
m.reg.Unregister(m.highestSentTimestamp)
m.reg.Unregister(m.pendingSamples)
m.reg.Unregister(m.shardCapacity)
m.reg.Unregister(m.numShards)
m.reg.Unregister(m.maxNumShards)
m.reg.Unregister(m.minNumShards)
m.reg.Unregister(m.desiredNumShards)
m.reg.Unregister(m.bytesSent)
}
}
// StorageClient defines an interface for sending a batch of samples to an // StorageClient defines an interface for sending a batch of samples to an
// external timeseries database. // external timeseries database.
type StorageClient interface { type StorageClient interface {
@ -255,25 +256,23 @@ type QueueManager struct {
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
metrics *queueManagerMetrics metrics *queueManagerMetrics
highestSentTimestampMetric *maxGauge
pendingSamplesMetric prometheus.Gauge
enqueueRetriesMetric prometheus.Counter
droppedSamplesTotal prometheus.Counter
numShardsMetric prometheus.Gauge
failedSamplesTotal prometheus.Counter
sentBatchDuration prometheus.Observer
succeededSamplesTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter
shardCapacity prometheus.Gauge
maxNumShards prometheus.Gauge
minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
bytesSent prometheus.Counter
} }
// NewQueueManager builds a new QueueManager. // NewQueueManager builds a new QueueManager.
func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { func NewQueueManager(
metrics *queueManagerMetrics,
watcherMetrics *wal.WatcherMetrics,
readerMetrics *wal.LiveReaderMetrics,
logger log.Logger,
walDir string,
samplesIn *ewmaRate,
cfg config.QueueConfig,
externalLabels labels.Labels,
relabelConfigs []*relabel.Config,
client StorageClient,
flushDeadline time.Duration,
) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -317,7 +316,7 @@ outer:
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref] lbls, ok := t.seriesLabels[s.Ref]
if !ok { if !ok {
t.droppedSamplesTotal.Inc() t.metrics.droppedSamplesTotal.Inc()
t.samplesDropped.incr(1) t.samplesDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok { if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
@ -343,7 +342,7 @@ outer:
continue outer continue outer
} }
t.enqueueRetriesMetric.Inc() t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff)) time.Sleep(time.Duration(backoff))
backoff = backoff * 2 backoff = backoff * 2
if backoff > t.cfg.MaxBackoff { if backoff > t.cfg.MaxBackoff {
@ -357,34 +356,12 @@ outer:
// Start the queue manager sending samples to the remote storage. // Start the queue manager sending samples to the remote storage.
// Does not block. // Does not block.
func (t *QueueManager) Start() { func (t *QueueManager) Start() {
// Setup the QueueManagers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, stopping them,
// and then starting new ones in storage/remote/storage.go ApplyConfig.
name := t.client().Name()
ep := t.client().Endpoint()
t.highestSentTimestampMetric = &maxGauge{
Gauge: t.metrics.queueHighestSentTimestamp.WithLabelValues(name, ep),
}
t.pendingSamplesMetric = t.metrics.queuePendingSamples.WithLabelValues(name, ep)
t.enqueueRetriesMetric = t.metrics.enqueueRetriesTotal.WithLabelValues(name, ep)
t.droppedSamplesTotal = t.metrics.droppedSamplesTotal.WithLabelValues(name, ep)
t.numShardsMetric = t.metrics.numShards.WithLabelValues(name, ep)
t.failedSamplesTotal = t.metrics.failedSamplesTotal.WithLabelValues(name, ep)
t.sentBatchDuration = t.metrics.sentBatchDuration.WithLabelValues(name, ep)
t.succeededSamplesTotal = t.metrics.succeededSamplesTotal.WithLabelValues(name, ep)
t.retriedSamplesTotal = t.metrics.retriedSamplesTotal.WithLabelValues(name, ep)
t.shardCapacity = t.metrics.shardCapacity.WithLabelValues(name, ep)
t.maxNumShards = t.metrics.maxNumShards.WithLabelValues(name, ep)
t.minNumShards = t.metrics.minNumShards.WithLabelValues(name, ep)
t.desiredNumShards = t.metrics.desiredNumShards.WithLabelValues(name, ep)
t.bytesSent = t.metrics.bytesSent.WithLabelValues(name, ep)
// Initialise some metrics. // Initialise some metrics.
t.shardCapacity.Set(float64(t.cfg.Capacity)) t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
t.pendingSamplesMetric.Set(0) t.metrics.pendingSamples.Set(0)
t.maxNumShards.Set(float64(t.cfg.MaxShards)) t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards))
t.minNumShards.Set(float64(t.cfg.MinShards)) t.metrics.minNumShards.Set(float64(t.cfg.MinShards))
t.desiredNumShards.Set(float64(t.cfg.MinShards)) t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards))
t.shards.start(t.numShards) t.shards.start(t.numShards)
t.watcher.Start() t.watcher.Start()
@ -414,22 +391,7 @@ func (t *QueueManager) Stop() {
releaseLabels(labels) releaseLabels(labels)
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
// Delete metrics so we don't have alerts for queues that are gone. t.metrics.unregister()
name := t.client().Name()
ep := t.client().Endpoint()
t.metrics.queueHighestSentTimestamp.DeleteLabelValues(name, ep)
t.metrics.queuePendingSamples.DeleteLabelValues(name, ep)
t.metrics.enqueueRetriesTotal.DeleteLabelValues(name, ep)
t.metrics.droppedSamplesTotal.DeleteLabelValues(name, ep)
t.metrics.numShards.DeleteLabelValues(name, ep)
t.metrics.failedSamplesTotal.DeleteLabelValues(name, ep)
t.metrics.sentBatchDuration.DeleteLabelValues(name, ep)
t.metrics.succeededSamplesTotal.DeleteLabelValues(name, ep)
t.metrics.retriedSamplesTotal.DeleteLabelValues(name, ep)
t.metrics.shardCapacity.DeleteLabelValues(name, ep)
t.metrics.maxNumShards.DeleteLabelValues(name, ep)
t.metrics.minNumShards.DeleteLabelValues(name, ep)
t.metrics.desiredNumShards.DeleteLabelValues(name, ep)
} }
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote. // StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
@ -597,7 +559,7 @@ func (t *QueueManager) calculateDesiredShards() int {
samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate) samplesKeptRatio = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate)
samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second)
samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
highestSent = t.highestSentTimestampMetric.Get() highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = highestTimestamp.Get() highestRecv = highestTimestamp.Get()
delay = highestRecv - highestSent delay = highestRecv - highestSent
samplesPending = delay * samplesInRate * samplesKeptRatio samplesPending = delay * samplesInRate * samplesKeptRatio
@ -616,7 +578,7 @@ func (t *QueueManager) calculateDesiredShards() int {
timePerSample = samplesOutDuration / samplesOutRate timePerSample = samplesOutDuration / samplesOutRate
desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)
) )
t.desiredNumShards.Set(desiredShards) t.metrics.desiredNumShards.Set(desiredShards)
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
"samplesInRate", samplesInRate, "samplesInRate", samplesInRate,
"samplesOutRate", samplesOutRate, "samplesOutRate", samplesOutRate,
@ -726,7 +688,7 @@ func (s *shards) start(n int) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i]) go s.runShard(hardShutdownCtx, i, newQueues[i])
} }
s.qm.numShardsMetric.Set(float64(n)) s.qm.metrics.numShards.Set(float64(n))
} }
// stop the shards; subsequent call to enqueue will return false. // stop the shards; subsequent call to enqueue will return false.
@ -820,7 +782,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
if nPending > 0 { if nPending > 0 {
level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending) level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending)
s.sendSamples(ctx, pendingSamples[:nPending], &buf) s.sendSamples(ctx, pendingSamples[:nPending], &buf)
s.qm.pendingSamplesMetric.Sub(float64(nPending)) s.qm.metrics.pendingSamples.Sub(float64(nPending))
level.Debug(s.qm.logger).Log("msg", "Done flushing.") level.Debug(s.qm.logger).Log("msg", "Done flushing.")
} }
return return
@ -833,12 +795,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
pendingSamples[nPending].Samples[0].Timestamp = sample.t pendingSamples[nPending].Samples[0].Timestamp = sample.t
pendingSamples[nPending].Samples[0].Value = sample.v pendingSamples[nPending].Samples[0].Value = sample.v
nPending++ nPending++
s.qm.pendingSamplesMetric.Inc() s.qm.metrics.pendingSamples.Inc()
if nPending >= max { if nPending >= max {
s.sendSamples(ctx, pendingSamples, &buf) s.sendSamples(ctx, pendingSamples, &buf)
nPending = 0 nPending = 0
s.qm.pendingSamplesMetric.Sub(float64(max)) s.qm.metrics.pendingSamples.Sub(float64(max))
stop() stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -848,7 +810,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
if nPending > 0 { if nPending > 0 {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum) level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum)
s.sendSamples(ctx, pendingSamples[:nPending], &buf) s.sendSamples(ctx, pendingSamples[:nPending], &buf)
s.qm.pendingSamplesMetric.Sub(float64(nPending)) s.qm.metrics.pendingSamples.Sub(float64(nPending))
nPending = 0 nPending = 0
} }
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -861,7 +823,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
err := s.sendSamplesWithBackoff(ctx, samples, buf) err := s.sendSamplesWithBackoff(ctx, samples, buf)
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
s.qm.failedSamplesTotal.Add(float64(len(samples))) s.qm.metrics.failedSamplesTotal.Add(float64(len(samples)))
} }
// These counters are used to calculate the dynamic sharding, and as such // These counters are used to calculate the dynamic sharding, and as such
@ -891,19 +853,19 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
begin := time.Now() begin := time.Now()
err := s.qm.client().Store(ctx, req) err := s.qm.client().Store(ctx, req)
s.qm.sentBatchDuration.Observe(time.Since(begin).Seconds()) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err == nil { if err == nil {
s.qm.succeededSamplesTotal.Add(float64(len(samples))) s.qm.metrics.succeededSamplesTotal.Add(float64(len(samples)))
s.qm.bytesSent.Add(float64(len(req))) s.qm.metrics.bytesSent.Add(float64(len(req)))
s.qm.highestSentTimestampMetric.Set(float64(highest / 1000)) s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return nil return nil
} }
if _, ok := err.(recoverableError); !ok { if _, ok := err.(recoverableError); !ok {
return err return err
} }
s.qm.retriedSamplesTotal.Add(float64(len(samples))) s.qm.metrics.retriedSamplesTotal.Add(float64(len(samples)))
level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err) level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
time.Sleep(time.Duration(backoff)) time.Sleep(time.Duration(backoff))

View file

@ -60,7 +60,7 @@ func TestSampleDelivery(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -89,7 +89,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -130,7 +130,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -149,7 +149,7 @@ func TestShutdown(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
@ -188,7 +188,7 @@ func TestSeriesReset(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
@ -218,7 +218,7 @@ func TestReshard(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -251,7 +251,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() { go func() {
for { for {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
h.Unlock() h.Unlock()
@ -268,7 +268,7 @@ func TestReshardRaceWithStop(t *testing.T) {
} }
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestStorageClient() c := NewTestStorageClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
@ -316,7 +316,7 @@ func TestShouldReshard(t *testing.T) {
}, },
} }
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestStorageClient() client := NewTestStorageClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
m.numShards = c.startingShards m.numShards = c.startingShards
@ -524,7 +524,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
testutil.Ok(b, err) testutil.Ok(b, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -565,7 +565,7 @@ func BenchmarkStartup(b *testing.B) {
logger = log.With(logger, "caller", log.DefaultCaller) logger = log.With(logger, "caller", log.DefaultCaller)
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestBlockedStorageClient() c := NewTestBlockedStorageClient()
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
@ -616,7 +616,7 @@ func TestCalculateDesiredShards(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
@ -649,7 +649,7 @@ func TestCalculateDesiredShards(t *testing.T) {
// highest sent is how far back pending samples would be at our input rate. // highest sent is how far back pending samples would be at our input rate.
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
m.highestSentTimestampMetric.Set(float64(highestSent.Unix())) m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix()) atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix())
} }

View file

@ -47,9 +47,9 @@ var (
// WriteStorage represents all the remote write storage. // WriteStorage represents all the remote write storage.
type WriteStorage struct { type WriteStorage struct {
logger log.Logger logger log.Logger
reg prometheus.Registerer
mtx sync.Mutex mtx sync.Mutex
queueMetrics *queueManagerMetrics
watcherMetrics *wal.WatcherMetrics watcherMetrics *wal.WatcherMetrics
liveReaderMetrics *wal.LiveReaderMetrics liveReaderMetrics *wal.LiveReaderMetrics
externalLabels labels.Labels externalLabels labels.Labels
@ -66,10 +66,10 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
} }
rws := &WriteStorage{ rws := &WriteStorage{
queues: make(map[string]*QueueManager), queues: make(map[string]*QueueManager),
queueMetrics: newQueueManagerMetrics(reg),
watcherMetrics: wal.NewWatcherMetrics(reg), watcherMetrics: wal.NewWatcherMetrics(reg),
liveReaderMetrics: wal.NewLiveReaderMetrics(reg), liveReaderMetrics: wal.NewLiveReaderMetrics(reg),
logger: logger, logger: logger,
reg: reg,
flushDeadline: flushDeadline, flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
walDir: walDir, walDir: walDir,
@ -136,8 +136,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
continue continue
} }
endpoint := rwConf.URL.String()
newQueues[hash] = NewQueueManager( newQueues[hash] = NewQueueManager(
rws.queueMetrics, newQueueManagerMetrics(rws.reg, name, endpoint),
rws.watcherMetrics, rws.watcherMetrics,
rws.liveReaderMetrics, rws.liveReaderMetrics,
rws.logger, rws.logger,