Merge pull request #9830 from prometheus/batch-queues

Batch samples before sending them to channels
This commit is contained in:
Chris Marchbanks 2021-12-02 08:37:41 -07:00 committed by GitHub
commit e95d4ec3f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -915,7 +915,7 @@ type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended.
qm *QueueManager
queues []chan interface{}
queues []*queue
// So we can accurately track how many of each are lost during shard shutdowns.
enqueuedSamples atomic.Int64
enqueuedExemplars atomic.Int64
@ -943,9 +943,9 @@ func (s *shards) start(n int) {
s.qm.metrics.pendingSamples.Set(0)
s.qm.metrics.numShards.Set(float64(n))
newQueues := make([]chan interface{}, n)
newQueues := make([]*queue, n)
for i := 0; i < n; i++ {
newQueues[i] = make(chan interface{}, s.qm.cfg.Capacity)
newQueues[i] = newQueue(s.qm.cfg.MaxSamplesPerSend, s.qm.cfg.Capacity)
}
s.queues = newQueues
@ -978,7 +978,7 @@ func (s *shards) stop() {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, queue := range s.queues {
close(queue)
go queue.FlushAndShutdown(s.done)
}
select {
case <-s.done:
@ -1013,7 +1013,11 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool {
select {
case <-s.softShutdown:
return false
case s.queues[shard] <- data:
default:
appended := s.queues[shard].Append(data, s.softShutdown)
if !appended {
return false
}
switch data.(type) {
case writeSample:
s.qm.metrics.pendingSamples.Inc()
@ -1028,7 +1032,93 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool {
}
}
func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}) {
type queue struct {
batch []interface{}
batchQueue chan []interface{}
// Since we know there are a limited number of batches out, using a stack
// is easy and safe so a sync.Pool is not necessary.
batchPool [][]interface{}
// This mutex covers adding and removing batches from the batchPool.
poolMux sync.Mutex
}
func newQueue(batchSize, capacity int) *queue {
batches := capacity / batchSize
return &queue{
batch: make([]interface{}, 0, batchSize),
batchQueue: make(chan []interface{}, batches),
// batchPool should have capacity for everything in the channel + 1 for
// the batch being processed.
batchPool: make([][]interface{}, 0, batches+1),
}
}
func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool {
q.batch = append(q.batch, datum)
if len(q.batch) == cap(q.batch) {
select {
case q.batchQueue <- q.batch:
q.batch = q.newBatch(cap(q.batch))
return true
case <-stop:
// Remove the sample we just appended. It will get retried.
q.batch = q.batch[:len(q.batch)-1]
return false
}
}
return true
}
func (q *queue) Chan() <-chan []interface{} {
return q.batchQueue
}
// Batch returns the current batch and allocates a new batch. Must not be
// called concurrently with Append.
func (q *queue) Batch() []interface{} {
batch := q.batch
q.batch = q.newBatch(cap(batch))
return batch
}
// ReturnForReuse adds the batch buffer back to the internal pool.
func (q *queue) ReturnForReuse(batch []interface{}) {
q.poolMux.Lock()
defer q.poolMux.Unlock()
if len(q.batchPool) < cap(q.batchPool) {
q.batchPool = append(q.batchPool, batch[:0])
}
}
// FlushAndShutdown stops the queue and flushes any samples. No appends can be
// made after this is called.
func (q *queue) FlushAndShutdown(done <-chan struct{}) {
if len(q.batch) > 0 {
select {
case q.batchQueue <- q.batch:
case <-done:
// The shard has been hard shut down, so no more samples can be
// sent. Drop everything left in the queue.
}
}
q.batch = nil
close(q.batchQueue)
}
func (q *queue) newBatch(capacity int) []interface{} {
q.poolMux.Lock()
defer q.poolMux.Unlock()
batches := len(q.batchPool)
if batches > 0 {
batch := q.batchPool[batches-1]
q.batchPool = q.batchPool[:batches-1]
return batch
}
return make([]interface{}, 0, capacity)
}
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
defer func() {
if s.running.Dec() == 0 {
close(s.done)
@ -1041,7 +1131,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
// If we have fewer samples than that, flush them out after a deadline anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
nPending, nPendingSamples, nPendingExemplars = 0, 0, 0
pBuf = proto.NewBuffer(nil)
buf []byte
@ -1050,6 +1139,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
max += int(float64(max) * 0.1)
}
batchQueue := queue.Chan()
pendingData := make([]prompb.TimeSeries, max)
for i := range pendingData {
pendingData[i].Samples = []prompb.Sample{{}}
@ -1074,8 +1164,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
case <-ctx.Done():
// In this case we drop all samples in the buffer and the queue.
// Remove them from pending and mark them as failed.
droppedSamples := nPendingSamples + int(s.enqueuedSamples.Load())
droppedExemplars := nPendingExemplars + int(s.enqueuedExemplars.Load())
droppedSamples := int(s.enqueuedSamples.Load())
droppedExemplars := int(s.enqueuedExemplars.Load())
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
@ -1084,18 +1174,46 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars))
return
case sample, ok := <-queue:
case batch, ok := <-batchQueue:
if !ok {
if nPendingSamples > 0 || nPendingExemplars > 0 {
level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars)
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
level.Debug(s.qm.logger).Log("msg", "Done flushing.")
}
return
}
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf)
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C:
// We need to take the lock when getting a batch to avoid
// concurrent Appends. Generally this will only happen on low
// traffic instances.
s.mtx.Lock()
// First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock.
var batch []interface{}
select {
case batch = <-batchQueue:
default:
batch = queue.Batch()
}
s.mtx.Unlock()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf)
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
}
}
func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.TimeSeries) (int, int) {
var nPendingSamples, nPendingExemplars int
for nPending, sample := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
@ -1108,40 +1226,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample)
nPendingSamples++
nPending++
case writeExemplar:
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar)
nPendingExemplars++
nPending++
}
if nPending >= max {
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
nPendingSamples = 0
nPendingExemplars = 0
nPending = 0
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
case <-timer.C:
if nPendingSamples > 0 || nPendingExemplars > 0 {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
nPendingSamples = 0
nPendingExemplars = 0
nPending = 0
}
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
}
return nPendingSamples, nPendingExemplars
}
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) {
@ -1158,6 +1249,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.qm.dataOut.incr(int64(len(samples)))
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars also should be subtracted as an error means
// they will not be retried.
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
s.enqueuedSamples.Sub(int64(sampleCount))
s.enqueuedExemplars.Sub(int64(exemplarCount))
}
// sendSamples to the remote storage with backoff for recoverable errors.