diff --git a/storage/remote/client.go b/storage/remote/client.go index 13502f75e..e122dcbf4 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -148,8 +148,11 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { }, nil } +const defaultBackoff = 0 + type RecoverableError struct { error + retryAfter model.Duration } // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled @@ -188,7 +191,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error { if err != nil { // Errors from Client.Do are from (for example) network errors, so are // recoverable. - return RecoverableError{err} + return RecoverableError{err, defaultBackoff} } defer func() { io.Copy(ioutil.Discard, httpResp.Body) @@ -204,11 +207,30 @@ func (c *Client) Store(ctx context.Context, req []byte) error { err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) } if httpResp.StatusCode/100 == 5 { - return RecoverableError{err} + return RecoverableError{err, defaultBackoff} + } + if httpResp.StatusCode == http.StatusTooManyRequests { + return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))} } return err } +// retryAfterDuration returns the duration for the Retry-After header. In case of any errors, it +// returns the defaultBackoff as if the header was never supplied. +func retryAfterDuration(t string) model.Duration { + parsedDuration, err := time.Parse(http.TimeFormat, t) + if err == nil { + s := time.Until(parsedDuration).Seconds() + return model.Duration(s) * model.Duration(time.Second) + } + // The duration can be in seconds. + d, err := strconv.Atoi(t) + if err != nil { + return defaultBackoff + } + return model.Duration(d) * model.Duration(time.Second) +} + // Name uniquely identifies the client. func (c Client) Name() string { return c.remoteName diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 93a4c59c7..82067d3af 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -49,7 +49,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { }, { code: 500, - err: RecoverableError{errors.New("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])}, + err: RecoverableError{errors.New("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen]), defaultBackoff}, }, } @@ -83,3 +83,30 @@ func TestStoreHTTPErrorHandling(t *testing.T) { server.Close() } } + +func TestRetryAfterDuration(t *testing.T) { + tc := []struct { + name string + tInput string + expected model.Duration + }{ + { + name: "seconds", + tInput: "120", + expected: model.Duration(time.Second * 120), + }, + { + name: "date-time default", + tInput: time.RFC1123, // Expected layout is http.TimeFormat, hence an error. + expected: defaultBackoff, + }, + { + name: "retry-after not provided", + tInput: "", // Expected layout is http.TimeFormat, hence an error. + expected: defaultBackoff, + }, + } + for _, c := range tc { + require.Equal(t, c.expected, retryAfterDuration(c.tInput), c.name) + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4e9067659..a3eb35e46 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -29,6 +29,7 @@ import ( "go.uber.org/atomic" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" @@ -1042,6 +1043,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { backoff := cfg.MinBackoff + sleepDuration := model.Duration(0) try := 0 for { @@ -1058,16 +1060,29 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } // If the error is unrecoverable, we should not retry. - if _, ok := err.(RecoverableError); !ok { + backoffErr, ok := err.(RecoverableError) + if !ok { return err } + sleepDuration = backoff + if backoffErr.retryAfter > 0 { + sleepDuration = backoffErr.retryAfter + level.Info(l).Log("msg", "Retrying after duration specified by Retry-After header", "duration", sleepDuration) + } else if backoffErr.retryAfter < 0 { + level.Debug(l).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism") + } + + select { + case <-ctx.Done(): + case <-time.After(time.Duration(sleepDuration)): + } + // If we make it this far, we've encountered a recoverable error and will retry. onRetry() level.Warn(l).Log("msg", "Failed to send batch, retrying", "err", err) - time.Sleep(time.Duration(backoff)) - backoff = backoff * 2 + backoff = sleepDuration * 2 if backoff > cfg.MaxBackoff { backoff = cfg.MaxBackoff