diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 806543f744..bd8cfdee6f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1490,10 +1490,31 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim } return nPendingSamples, nPendingExemplars, nPendingHistograms } - func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) + // Build the WriteRequest with no metadata. + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. + req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) + if err == nil { + err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) + } + s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) +} + +func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { + begin := time.Now() + // Build the ReducedWriteRequest with no metadata. + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. + req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf) + if err == nil { + err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) + } + s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) +} + +func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) { if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) @@ -1503,8 +1524,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure. - s.qm.dataOut.incr(int64(len(samples))) - s.qm.dataOutDuration.incr(int64(time.Since(begin))) + s.qm.dataOut.incr(int64(sampleCount + exemplarCount + histogramCount)) + s.qm.dataOutDuration.incr(int64(duration)) s.qm.lastSendTimestamp.Store(time.Now().Unix()) // Pending samples/exemplars/histograms also should be subtracted, as an error means // they will not be retried. @@ -1517,18 +1538,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { - // Build the WriteRequest with no metadata. - req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) - if err != nil { - // Failing to build the write request is non-recoverable, since it will - // only error if marshaling the proto to bytes fails. - return err - } - - reqSize := len(req) - *buf = req - +func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, sampleCount, exemplarCount, histogramCount int, highestTimestamp int64) error { + reqSize := len(rawReq) // An anonymous function allows us to defer the completion of our per-try spans // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. @@ -1555,7 +1566,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - err := s.qm.client().Store(ctx, *buf, try) + err := s.qm.client().Store(ctx, rawReq, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { @@ -1572,7 +1583,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) } - err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) + err := sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) if errors.Is(err, context.Canceled) { // When there is resharding, we cancel the context for this queue, which means the data is not sent. // So we exit early to not update the metrics. @@ -1580,7 +1591,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) - s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) + s.qm.metrics.highestSentTimestamp.Set(float64(highestTimestamp / 1000)) return err } @@ -1637,100 +1648,6 @@ func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries, return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { - begin := time.Now() - err := s.sendReducedSamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, pBuf, buf) - if err != nil { - level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) - s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) - } - - // These counters are used to calculate the dynamic sharding, and as such - // should be maintained irrespective of success or failure. - s.qm.dataOut.incr(int64(len(samples))) - s.qm.dataOutDuration.incr(int64(time.Since(begin))) - s.qm.lastSendTimestamp.Store(time.Now().Unix()) - // Pending samples/exemplars/histograms also should be subtracted as an error means - // they will not be retried. - s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) - s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) - s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) - s.enqueuedSamples.Sub(int64(sampleCount)) - s.enqueuedExemplars.Sub(int64(exemplarCount)) - s.enqueuedHistograms.Sub(int64(histogramCount)) -} - -// sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendReducedSamplesWithBackoff(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { - // Build the WriteRequest with no metadata. - req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf) - if err != nil { - // Failing to build the write request is non-recoverable, since it will - // only error if marshaling the proto to bytes fails. - return err - } - - reqSize := len(req) - *buf = req - - // An anonymous function allows us to defer the completion of our per-try spans - // without causing a memory leak, and it has the nice effect of not propagating any - // parameters for sendSamplesWithBackoff/3. - attemptStore := func(try int) error { - ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") - defer span.End() - - span.SetAttributes( - attribute.Int("request_size", reqSize), - attribute.Int("samples", sampleCount), - attribute.Int("try", try), - attribute.String("remote_name", s.qm.storeClient.Name()), - attribute.String("remote_url", s.qm.storeClient.Endpoint()), - ) - - if exemplarCount > 0 { - span.SetAttributes(attribute.Int("exemplars", exemplarCount)) - } - if histogramCount > 0 { - span.SetAttributes(attribute.Int("histograms", histogramCount)) - } - - begin := time.Now() - s.qm.metrics.samplesTotal.Add(float64(sampleCount)) - s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - err := s.qm.client().Store(ctx, *buf, try) - s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) - - if err != nil { - span.RecordError(err) - return err - } - - return nil - } - - onRetry := func() { - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) - } - - err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) - if errors.Is(err, context.Canceled) { - // When there is resharding, we cancel the context for this queue, which means the data is not sent. - // So we exit early to not update the metrics. - return err - } - - s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) - s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) - - return err -} - func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { backoff := cfg.MinBackoff sleepDuration := model.Duration(0)