From 3268eac2ddda4300f962213feaf6b763a0ecd295 Mon Sep 17 00:00:00 2001 From: Cody Boggs Date: Mon, 1 Jun 2020 09:21:13 -0600 Subject: [PATCH] Trace Remote Write requests (#7206) * Trace Remote Write requests Signed-off-by: Cody Boggs * Refactor store attempts to keep code flow clearer, and avoid so many places to deal with span finishing Signed-off-by: Cody Boggs --- storage/remote/client.go | 16 ++++++- storage/remote/queue_manager.go | 79 ++++++++++++++++++++++++--------- 2 files changed, 72 insertions(+), 23 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 55ceb2341..4dce3ec50 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -92,11 +92,23 @@ func (c *Client) Store(ctx context.Context, req []byte) error { httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", userAgent) httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() - httpResp, err := c.client.Do(httpReq.WithContext(ctx)) + httpReq = httpReq.WithContext(ctx) + + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + var ht *nethttp.Tracer + httpReq, ht = nethttp.TraceRequest( + parentSpan.Tracer(), + httpReq, + nethttp.OperationName("Remote Store"), + nethttp.ClientTrace(false), + ) + defer ht.Finish() + } + + httpResp, err := c.client.Do(httpReq) if err != nil { // Errors from client.Do are from (for example) network errors, so are // recoverable. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index fbdd28d8d..6a8b4c432 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -25,6 +25,8 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" @@ -835,44 +837,79 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b // sendSamples to the remote storage with backoff for recoverable errors. func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error { - backoff := s.qm.cfg.MinBackoff req, highest, err := buildWriteRequest(samples, *buf) - *buf = req if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. return err } + backoff := s.qm.cfg.MinBackoff + reqSize := len(*buf) + sampleCount := len(samples) + *buf = req + try := 0 + + // An anonymous function allows us to defer the completion of our per-try spans + // without causing a memory leak, and it has the nice effect of not propagating any + // parameters for sendSamplesWithBackoff/3. + attemptStore := func() error { + span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch") + defer span.Finish() + + span.SetTag("samples", sampleCount) + span.SetTag("request_size", reqSize) + span.SetTag("try", try) + span.SetTag("remote_name", s.qm.storeClient.Name()) + span.SetTag("remote_url", s.qm.storeClient.Endpoint()) + + begin := time.Now() + err := s.qm.client().Store(ctx, *buf) + s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) + + if err != nil { + span.LogKV("error", err) + ext.Error.Set(span, true) + return err + } + + return nil + } + for { select { case <-ctx.Done(): return ctx.Err() default: } - begin := time.Now() - err := s.qm.client().Store(ctx, req) - s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) + err = attemptStore() - if err == nil { - s.qm.metrics.succeededSamplesTotal.Add(float64(len(samples))) - s.qm.metrics.bytesSent.Add(float64(len(req))) - s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) - return nil + if err != nil { + // If the error is unrecoverable, we should not retry. + if _, ok := err.(recoverableError); !ok { + return err + } + + // If we make it this far, we've encountered a recoverable error and will retry. + s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) + level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err) + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + + if backoff > s.qm.cfg.MaxBackoff { + backoff = s.qm.cfg.MaxBackoff + } + + try++ + continue } - if _, ok := err.(recoverableError); !ok { - return err - } - s.qm.metrics.retriedSamplesTotal.Add(float64(len(samples))) - level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err) - - time.Sleep(time.Duration(backoff)) - backoff = backoff * 2 - if backoff > s.qm.cfg.MaxBackoff { - backoff = s.qm.cfg.MaxBackoff - } + // Since we retry forever on recoverable errors, this needs to stay inside the loop. + s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount)) + s.qm.metrics.bytesSent.Add(float64(reqSize)) + s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) + return nil } }