diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b33230518..dc2be46e9 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -915,7 +915,7 @@ type shards struct { mtx sync.RWMutex // With the WAL, this is never actually contended. qm *QueueManager - queues []chan interface{} + queues []*queue // So we can accurately track how many of each are lost during shard shutdowns. enqueuedSamples atomic.Int64 enqueuedExemplars atomic.Int64 @@ -943,9 +943,9 @@ func (s *shards) start(n int) { s.qm.metrics.pendingSamples.Set(0) s.qm.metrics.numShards.Set(float64(n)) - newQueues := make([]chan interface{}, n) + newQueues := make([]*queue, n) for i := 0; i < n; i++ { - newQueues[i] = make(chan interface{}, s.qm.cfg.Capacity) + newQueues[i] = newQueue(s.qm.cfg.MaxSamplesPerSend, s.qm.cfg.Capacity) } s.queues = newQueues @@ -978,7 +978,7 @@ func (s *shards) stop() { s.mtx.Lock() defer s.mtx.Unlock() for _, queue := range s.queues { - close(queue) + go queue.FlushAndShutdown(s.done) } select { case <-s.done: @@ -1013,7 +1013,11 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { select { case <-s.softShutdown: return false - case s.queues[shard] <- data: + default: + appended := s.queues[shard].Append(data, s.softShutdown) + if !appended { + return false + } switch data.(type) { case writeSample: s.qm.metrics.pendingSamples.Inc() @@ -1028,7 +1032,93 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { } } -func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface{}) { +type queue struct { + batch []interface{} + batchQueue chan []interface{} + + // Since we know there are a limited number of batches out, using a stack + // is easy and safe so a sync.Pool is not necessary. + batchPool [][]interface{} + // This mutex covers adding and removing batches from the batchPool. + poolMux sync.Mutex +} + +func newQueue(batchSize, capacity int) *queue { + batches := capacity / batchSize + return &queue{ + batch: make([]interface{}, 0, batchSize), + batchQueue: make(chan []interface{}, batches), + // batchPool should have capacity for everything in the channel + 1 for + // the batch being processed. + batchPool: make([][]interface{}, 0, batches+1), + } +} + +func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool { + q.batch = append(q.batch, datum) + if len(q.batch) == cap(q.batch) { + select { + case q.batchQueue <- q.batch: + q.batch = q.newBatch(cap(q.batch)) + return true + case <-stop: + // Remove the sample we just appended. It will get retried. + q.batch = q.batch[:len(q.batch)-1] + return false + } + } + return true +} + +func (q *queue) Chan() <-chan []interface{} { + return q.batchQueue +} + +// Batch returns the current batch and allocates a new batch. Must not be +// called concurrently with Append. +func (q *queue) Batch() []interface{} { + batch := q.batch + q.batch = q.newBatch(cap(batch)) + return batch +} + +// ReturnForReuse adds the batch buffer back to the internal pool. +func (q *queue) ReturnForReuse(batch []interface{}) { + q.poolMux.Lock() + defer q.poolMux.Unlock() + if len(q.batchPool) < cap(q.batchPool) { + q.batchPool = append(q.batchPool, batch[:0]) + } +} + +// FlushAndShutdown stops the queue and flushes any samples. No appends can be +// made after this is called. +func (q *queue) FlushAndShutdown(done <-chan struct{}) { + if len(q.batch) > 0 { + select { + case q.batchQueue <- q.batch: + case <-done: + // The shard has been hard shut down, so no more samples can be + // sent. Drop everything left in the queue. + } + } + q.batch = nil + close(q.batchQueue) +} + +func (q *queue) newBatch(capacity int) []interface{} { + q.poolMux.Lock() + defer q.poolMux.Unlock() + batches := len(q.batchPool) + if batches > 0 { + batch := q.batchPool[batches-1] + q.batchPool = q.batchPool[:batches-1] + return batch + } + return make([]interface{}, 0, capacity) +} + +func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { defer func() { if s.running.Dec() == 0 { close(s.done) @@ -1040,8 +1130,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline anyways. var ( - max = s.qm.cfg.MaxSamplesPerSend - nPending, nPendingSamples, nPendingExemplars = 0, 0, 0 + max = s.qm.cfg.MaxSamplesPerSend pBuf = proto.NewBuffer(nil) buf []byte @@ -1050,6 +1139,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface max += int(float64(max) * 0.1) } + batchQueue := queue.Chan() pendingData := make([]prompb.TimeSeries, max) for i := range pendingData { pendingData[i].Samples = []prompb.Sample{{}} @@ -1074,8 +1164,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface case <-ctx.Done(): // In this case we drop all samples in the buffer and the queue. // Remove them from pending and mark them as failed. - droppedSamples := nPendingSamples + int(s.enqueuedSamples.Load()) - droppedExemplars := nPendingExemplars + int(s.enqueuedExemplars.Load()) + droppedSamples := int(s.enqueuedSamples.Load()) + droppedExemplars := int(s.enqueuedExemplars.Load()) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) @@ -1084,66 +1174,67 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars)) return - case sample, ok := <-queue: + case batch, ok := <-batchQueue: if !ok { - if nPendingSamples > 0 || nPendingExemplars > 0 { - level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars) - s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) - level.Debug(s.qm.logger).Log("msg", "Done flushing.") - } return } + nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) + queue.ReturnForReuse(batch) + n := nPendingSamples + nPendingExemplars + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) - pendingData[nPending].Samples = pendingData[nPending].Samples[:0] - if s.qm.sendExemplars { - pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] - } - // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) - // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll - // stop reading from the queue. This makes it safe to reference pendingSamples by index. - switch d := sample.(type) { - case writeSample: - pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample) - nPendingSamples++ - nPending++ - - case writeExemplar: - pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar) - nPendingExemplars++ - nPending++ - } - - if nPending >= max { - s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) - nPendingSamples = 0 - nPendingExemplars = 0 - nPending = 0 - - stop() - timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) - } + stop() + timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) case <-timer.C: - if nPendingSamples > 0 || nPendingExemplars > 0 { - level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) - s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) - nPendingSamples = 0 - nPendingExemplars = 0 - nPending = 0 + // We need to take the lock when getting a batch to avoid + // concurrent Appends. Generally this will only happen on low + // traffic instances. + s.mtx.Lock() + // First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock. + var batch []interface{} + select { + case batch = <-batchQueue: + default: + batch = queue.Batch() } + s.mtx.Unlock() + if len(batch) > 0 { + nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) + n := nPendingSamples + nPendingExemplars + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) + } + queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) } } } +func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.TimeSeries) (int, int) { + var nPendingSamples, nPendingExemplars int + for nPending, sample := range batch { + pendingData[nPending].Samples = pendingData[nPending].Samples[:0] + if s.qm.sendExemplars { + pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] + } + // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) + // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll + // stop reading from the queue. This makes it safe to reference pendingSamples by index. + switch d := sample.(type) { + case writeSample: + pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample) + nPendingSamples++ + case writeExemplar: + pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar) + nPendingExemplars++ + } + } + return nPendingSamples, nPendingExemplars +} + func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, pBuf, buf) @@ -1158,6 +1249,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.qm.dataOut.incr(int64(len(samples))) s.qm.dataOutDuration.incr(int64(time.Since(begin))) s.qm.lastSendTimestamp.Store(time.Now().Unix()) + // Pending samples/exemplars also should be subtracted as an error means + // they will not be retried. + s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) + s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) + s.enqueuedSamples.Sub(int64(sampleCount)) + s.enqueuedExemplars.Sub(int64(exemplarCount)) } // sendSamples to the remote storage with backoff for recoverable errors.