mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 21:54:10 -08:00
Trace Remote Write requests (#7206)
* Trace Remote Write requests Signed-off-by: Cody Boggs <cboggs@splunk.com> * Refactor store attempts to keep code flow clearer, and avoid so many places to deal with span finishing Signed-off-by: Cody Boggs <cboggs@splunk.com>
This commit is contained in:
parent
d6374ae1b6
commit
3268eac2dd
|
@ -92,11 +92,23 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
|
|||
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
||||
httpReq.Header.Set("User-Agent", userAgent)
|
||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
defer cancel()
|
||||
|
||||
httpResp, err := c.client.Do(httpReq.WithContext(ctx))
|
||||
httpReq = httpReq.WithContext(ctx)
|
||||
|
||||
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
|
||||
var ht *nethttp.Tracer
|
||||
httpReq, ht = nethttp.TraceRequest(
|
||||
parentSpan.Tracer(),
|
||||
httpReq,
|
||||
nethttp.OperationName("Remote Store"),
|
||||
nethttp.ClientTrace(false),
|
||||
)
|
||||
defer ht.Finish()
|
||||
}
|
||||
|
||||
httpResp, err := c.client.Do(httpReq)
|
||||
if err != nil {
|
||||
// Errors from client.Do are from (for example) network errors, so are
|
||||
// recoverable.
|
||||
|
|
|
@ -25,6 +25,8 @@ import (
|
|||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
|
@ -835,44 +837,79 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
|
|||
|
||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error {
|
||||
backoff := s.qm.cfg.MinBackoff
|
||||
req, highest, err := buildWriteRequest(samples, *buf)
|
||||
*buf = req
|
||||
if err != nil {
|
||||
// Failing to build the write request is non-recoverable, since it will
|
||||
// only error if marshaling the proto to bytes fails.
|
||||
return err
|
||||
}
|
||||
|
||||
backoff := s.qm.cfg.MinBackoff
|
||||
reqSize := len(*buf)
|
||||
sampleCount := len(samples)
|
||||
*buf = req
|
||||
try := 0
|
||||
|
||||
// An anonymous function allows us to defer the completion of our per-try spans
|
||||
// without causing a memory leak, and it has the nice effect of not propagating any
|
||||
// parameters for sendSamplesWithBackoff/3.
|
||||
attemptStore := func() error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch")
|
||||
defer span.Finish()
|
||||
|
||||
span.SetTag("samples", sampleCount)
|
||||
span.SetTag("request_size", reqSize)
|
||||
span.SetTag("try", try)
|
||||
span.SetTag("remote_name", s.qm.storeClient.Name())
|
||||
span.SetTag("remote_url", s.qm.storeClient.Endpoint())
|
||||
|
||||
begin := time.Now()
|
||||
err := s.qm.client().Store(ctx, *buf)
|
||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||
|
||||
if err != nil {
|
||||
span.LogKV("error", err)
|
||||
ext.Error.Set(span, true)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
begin := time.Now()
|
||||
err := s.qm.client().Store(ctx, req)
|
||||
|
||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||
err = attemptStore()
|
||||
|
||||
if err == nil {
|
||||
s.qm.metrics.succeededSamplesTotal.Add(float64(len(samples)))
|
||||
s.qm.metrics.bytesSent.Add(float64(len(req)))
|
||||
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
|
||||
return nil
|
||||
if err != nil {
|
||||
// If the error is unrecoverable, we should not retry.
|
||||
if _, ok := err.(recoverableError); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we make it this far, we've encountered a recoverable error and will retry.
|
||||
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
|
||||
level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
|
||||
time.Sleep(time.Duration(backoff))
|
||||
backoff = backoff * 2
|
||||
|
||||
if backoff > s.qm.cfg.MaxBackoff {
|
||||
backoff = s.qm.cfg.MaxBackoff
|
||||
}
|
||||
|
||||
try++
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := err.(recoverableError); !ok {
|
||||
return err
|
||||
}
|
||||
s.qm.metrics.retriedSamplesTotal.Add(float64(len(samples)))
|
||||
level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
|
||||
|
||||
time.Sleep(time.Duration(backoff))
|
||||
backoff = backoff * 2
|
||||
if backoff > s.qm.cfg.MaxBackoff {
|
||||
backoff = s.qm.cfg.MaxBackoff
|
||||
}
|
||||
// Since we retry forever on recoverable errors, this needs to stay inside the loop.
|
||||
s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount))
|
||||
s.qm.metrics.bytesSent.Add(float64(reqSize))
|
||||
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue