mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
refactor queue manager code to remove some duplication
This commit is contained in:
parent
3be59f0ca6
commit
2f815ee3dd
|
@ -1490,10 +1490,31 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
|
||||||
}
|
}
|
||||||
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf)
|
// Build the WriteRequest with no metadata.
|
||||||
|
// Failing to build the write request is non-recoverable, since it will
|
||||||
|
// only error if marshaling the proto to bytes fails.
|
||||||
|
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
|
||||||
|
if err == nil {
|
||||||
|
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
|
||||||
|
}
|
||||||
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
||||||
|
begin := time.Now()
|
||||||
|
// Build the ReducedWriteRequest with no metadata.
|
||||||
|
// Failing to build the write request is non-recoverable, since it will
|
||||||
|
// only error if marshaling the proto to bytes fails.
|
||||||
|
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf)
|
||||||
|
if err == nil {
|
||||||
|
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
|
||||||
|
}
|
||||||
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
|
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
|
||||||
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
|
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
|
||||||
|
@ -1503,8 +1524,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
|
||||||
|
|
||||||
// These counters are used to calculate the dynamic sharding, and as such
|
// These counters are used to calculate the dynamic sharding, and as such
|
||||||
// should be maintained irrespective of success or failure.
|
// should be maintained irrespective of success or failure.
|
||||||
s.qm.dataOut.incr(int64(len(samples)))
|
s.qm.dataOut.incr(int64(sampleCount + exemplarCount + histogramCount))
|
||||||
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
|
s.qm.dataOutDuration.incr(int64(duration))
|
||||||
s.qm.lastSendTimestamp.Store(time.Now().Unix())
|
s.qm.lastSendTimestamp.Store(time.Now().Unix())
|
||||||
// Pending samples/exemplars/histograms also should be subtracted, as an error means
|
// Pending samples/exemplars/histograms also should be subtracted, as an error means
|
||||||
// they will not be retried.
|
// they will not be retried.
|
||||||
|
@ -1517,18 +1538,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||||
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
|
func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, sampleCount, exemplarCount, histogramCount int, highestTimestamp int64) error {
|
||||||
// Build the WriteRequest with no metadata.
|
reqSize := len(rawReq)
|
||||||
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
|
|
||||||
if err != nil {
|
|
||||||
// Failing to build the write request is non-recoverable, since it will
|
|
||||||
// only error if marshaling the proto to bytes fails.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reqSize := len(req)
|
|
||||||
*buf = req
|
|
||||||
|
|
||||||
// An anonymous function allows us to defer the completion of our per-try spans
|
// An anonymous function allows us to defer the completion of our per-try spans
|
||||||
// without causing a memory leak, and it has the nice effect of not propagating any
|
// without causing a memory leak, and it has the nice effect of not propagating any
|
||||||
// parameters for sendSamplesWithBackoff/3.
|
// parameters for sendSamplesWithBackoff/3.
|
||||||
|
@ -1555,7 +1566,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
|
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
|
||||||
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
||||||
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
||||||
err := s.qm.client().Store(ctx, *buf, try)
|
err := s.qm.client().Store(ctx, rawReq, try)
|
||||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1572,7 +1583,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
|
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
|
err := sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
|
||||||
if errors.Is(err, context.Canceled) {
|
if errors.Is(err, context.Canceled) {
|
||||||
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
|
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
|
||||||
// So we exit early to not update the metrics.
|
// So we exit early to not update the metrics.
|
||||||
|
@ -1580,7 +1591,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
}
|
}
|
||||||
|
|
||||||
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
|
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
|
||||||
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
|
s.qm.metrics.highestSentTimestamp.Set(float64(highestTimestamp / 1000))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1637,100 +1648,6 @@ func (s *shards) populateReducedTimeSeries(pool *lookupPool, batch []timeSeries,
|
||||||
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) sendReducedSamples(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
|
||||||
begin := time.Now()
|
|
||||||
err := s.sendReducedSamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, pBuf, buf)
|
|
||||||
if err != nil {
|
|
||||||
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
|
|
||||||
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
|
|
||||||
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
|
|
||||||
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
|
|
||||||
}
|
|
||||||
|
|
||||||
// These counters are used to calculate the dynamic sharding, and as such
|
|
||||||
// should be maintained irrespective of success or failure.
|
|
||||||
s.qm.dataOut.incr(int64(len(samples)))
|
|
||||||
s.qm.dataOutDuration.incr(int64(time.Since(begin)))
|
|
||||||
s.qm.lastSendTimestamp.Store(time.Now().Unix())
|
|
||||||
// Pending samples/exemplars/histograms also should be subtracted as an error means
|
|
||||||
// they will not be retried.
|
|
||||||
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
|
|
||||||
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount))
|
|
||||||
s.qm.metrics.pendingHistograms.Sub(float64(histogramCount))
|
|
||||||
s.enqueuedSamples.Sub(int64(sampleCount))
|
|
||||||
s.enqueuedExemplars.Sub(int64(exemplarCount))
|
|
||||||
s.enqueuedHistograms.Sub(int64(histogramCount))
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
|
||||||
func (s *shards) sendReducedSamplesWithBackoff(ctx context.Context, samples []prompb.ReducedTimeSeries, labels map[uint64]string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
|
|
||||||
// Build the WriteRequest with no metadata.
|
|
||||||
req, highest, err := buildReducedWriteRequest(samples, labels, pBuf, *buf)
|
|
||||||
if err != nil {
|
|
||||||
// Failing to build the write request is non-recoverable, since it will
|
|
||||||
// only error if marshaling the proto to bytes fails.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reqSize := len(req)
|
|
||||||
*buf = req
|
|
||||||
|
|
||||||
// An anonymous function allows us to defer the completion of our per-try spans
|
|
||||||
// without causing a memory leak, and it has the nice effect of not propagating any
|
|
||||||
// parameters for sendSamplesWithBackoff/3.
|
|
||||||
attemptStore := func(try int) error {
|
|
||||||
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
span.SetAttributes(
|
|
||||||
attribute.Int("request_size", reqSize),
|
|
||||||
attribute.Int("samples", sampleCount),
|
|
||||||
attribute.Int("try", try),
|
|
||||||
attribute.String("remote_name", s.qm.storeClient.Name()),
|
|
||||||
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
|
|
||||||
)
|
|
||||||
|
|
||||||
if exemplarCount > 0 {
|
|
||||||
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
|
|
||||||
}
|
|
||||||
if histogramCount > 0 {
|
|
||||||
span.SetAttributes(attribute.Int("histograms", histogramCount))
|
|
||||||
}
|
|
||||||
|
|
||||||
begin := time.Now()
|
|
||||||
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
|
|
||||||
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
|
||||||
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
|
||||||
err := s.qm.client().Store(ctx, *buf, try)
|
|
||||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
span.RecordError(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
onRetry := func() {
|
|
||||||
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
|
|
||||||
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
|
|
||||||
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
|
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
|
|
||||||
// So we exit early to not update the metrics.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
|
|
||||||
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
|
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
|
||||||
backoff := cfg.MinBackoff
|
backoff := cfg.MinBackoff
|
||||||
sleepDuration := model.Duration(0)
|
sleepDuration := model.Duration(0)
|
||||||
|
|
Loading…
Reference in a new issue