Pending Samples metric includes samples in channel (#7335)

* Pending Samples metric includes samples in channel

The pending samples metric should also include samples waiting in the
channels to be sent to provide a more accurate measure. In addition,
make sure that the pending samples is reset to 0 anytime a queue is
started as we remake all of the shards at that time.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>

* Log the number of dropped samples on hard shutdown

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2020-06-25 14:48:30 -06:00 committed by GitHub
parent ec45e3d029
commit d78656c244
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -360,7 +360,6 @@ outer:
func (t *QueueManager) Start() { func (t *QueueManager) Start() {
// Initialise some metrics. // Initialise some metrics.
t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
t.metrics.pendingSamples.Set(0)
t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards)) t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards))
t.metrics.minNumShards.Set(float64(t.cfg.MinShards)) t.metrics.minNumShards.Set(float64(t.cfg.MinShards))
t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards)) t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards))
@ -668,6 +667,7 @@ type shards struct {
// Hard shutdown context is used to terminate outgoing HTTP connections // Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate. // after giving them a chance to terminate.
hardShutdown context.CancelFunc hardShutdown context.CancelFunc
droppedOnHardShutdown uint32
} }
// start the shards; must be called before any call to enqueue. // start the shards; must be called before any call to enqueue.
@ -675,6 +675,9 @@ func (s *shards) start(n int) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
s.qm.metrics.pendingSamples.Set(0)
s.qm.metrics.numShards.Set(float64(n))
newQueues := make([]chan sample, n) newQueues := make([]chan sample, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
newQueues[i] = make(chan sample, s.qm.cfg.Capacity) newQueues[i] = make(chan sample, s.qm.cfg.Capacity)
@ -687,10 +690,10 @@ func (s *shards) start(n int) {
s.softShutdown = make(chan struct{}) s.softShutdown = make(chan struct{})
s.running = int32(n) s.running = int32(n)
s.done = make(chan struct{}) s.done = make(chan struct{})
atomic.StoreUint32(&s.droppedOnHardShutdown, 0)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i]) go s.runShard(hardShutdownCtx, i, newQueues[i])
} }
s.qm.metrics.numShards.Set(float64(n))
} }
// stop the shards; subsequent call to enqueue will return false. // stop the shards; subsequent call to enqueue will return false.
@ -715,12 +718,14 @@ func (s *shards) stop() {
case <-s.done: case <-s.done:
return return
case <-time.After(s.qm.flushDeadline): case <-time.After(s.qm.flushDeadline):
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown")
} }
// Force an unclean shutdown. // Force an unclean shutdown.
s.hardShutdown() s.hardShutdown()
<-s.done <-s.done
if dropped := atomic.LoadUint32(&s.droppedOnHardShutdown); dropped > 0 {
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped)
}
} }
// enqueue a sample. If we are currently in the process of shutting down or resharding, // enqueue a sample. If we are currently in the process of shutting down or resharding,
@ -740,6 +745,7 @@ func (s *shards) enqueue(ref uint64, sample sample) bool {
case <-s.softShutdown: case <-s.softShutdown:
return false return false
case s.queues[shard] <- sample: case s.queues[shard] <- sample:
s.qm.metrics.pendingSamples.Inc()
return true return true
} }
} }
@ -777,6 +783,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// In this case we drop all samples in the buffer and the queue.
// Remove them from pending and mark them as failed.
droppedSamples := nPending + len(queue)
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples))
return return
case sample, ok := <-queue: case sample, ok := <-queue:
@ -797,7 +809,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
pendingSamples[nPending].Samples[0].Timestamp = sample.t pendingSamples[nPending].Samples[0].Timestamp = sample.t
pendingSamples[nPending].Samples[0].Value = sample.v pendingSamples[nPending].Samples[0].Value = sample.v
nPending++ nPending++
s.qm.metrics.pendingSamples.Inc()
if nPending >= max { if nPending >= max {
s.sendSamples(ctx, pendingSamples, &buf) s.sendSamples(ctx, pendingSamples, &buf)