mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Consider status code 429 as recoverable errors to avoid resharding (#8237)
This reverts commit 49a8ce5239
.
This commit is necessary since we only wanted to not have the
functionality in 2.25. It will be improved soon on the main branch.
Co-authored-by: Harkishen-Singh <harkishensingh@hotmail.com>
Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
parent
526095f9fa
commit
93dcc3c7be
|
@ -148,8 +148,11 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const defaultBackoff = 0
|
||||||
|
|
||||||
type RecoverableError struct {
|
type RecoverableError struct {
|
||||||
error
|
error
|
||||||
|
retryAfter model.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
||||||
|
@ -188,7 +191,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Errors from Client.Do are from (for example) network errors, so are
|
// Errors from Client.Do are from (for example) network errors, so are
|
||||||
// recoverable.
|
// recoverable.
|
||||||
return RecoverableError{err}
|
return RecoverableError{err, defaultBackoff}
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
io.Copy(ioutil.Discard, httpResp.Body)
|
io.Copy(ioutil.Discard, httpResp.Body)
|
||||||
|
@ -204,11 +207,30 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
|
||||||
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
||||||
}
|
}
|
||||||
if httpResp.StatusCode/100 == 5 {
|
if httpResp.StatusCode/100 == 5 {
|
||||||
return RecoverableError{err}
|
return RecoverableError{err, defaultBackoff}
|
||||||
|
}
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||||
|
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retryAfterDuration returns the duration for the Retry-After header. In case of any errors, it
|
||||||
|
// returns the defaultBackoff as if the header was never supplied.
|
||||||
|
func retryAfterDuration(t string) model.Duration {
|
||||||
|
parsedDuration, err := time.Parse(http.TimeFormat, t)
|
||||||
|
if err == nil {
|
||||||
|
s := time.Until(parsedDuration).Seconds()
|
||||||
|
return model.Duration(s) * model.Duration(time.Second)
|
||||||
|
}
|
||||||
|
// The duration can be in seconds.
|
||||||
|
d, err := strconv.Atoi(t)
|
||||||
|
if err != nil {
|
||||||
|
return defaultBackoff
|
||||||
|
}
|
||||||
|
return model.Duration(d) * model.Duration(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
// Name uniquely identifies the client.
|
// Name uniquely identifies the client.
|
||||||
func (c Client) Name() string {
|
func (c Client) Name() string {
|
||||||
return c.remoteName
|
return c.remoteName
|
||||||
|
|
|
@ -49,7 +49,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
code: 500,
|
code: 500,
|
||||||
err: RecoverableError{errors.New("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])},
|
err: RecoverableError{errors.New("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen]), defaultBackoff},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,3 +83,30 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
server.Close()
|
server.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryAfterDuration(t *testing.T) {
|
||||||
|
tc := []struct {
|
||||||
|
name string
|
||||||
|
tInput string
|
||||||
|
expected model.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "seconds",
|
||||||
|
tInput: "120",
|
||||||
|
expected: model.Duration(time.Second * 120),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "date-time default",
|
||||||
|
tInput: time.RFC1123, // Expected layout is http.TimeFormat, hence an error.
|
||||||
|
expected: defaultBackoff,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retry-after not provided",
|
||||||
|
tInput: "", // Expected layout is http.TimeFormat, hence an error.
|
||||||
|
expected: defaultBackoff,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range tc {
|
||||||
|
require.Equal(t, c.expected, retryAfterDuration(c.tInput), c.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/relabel"
|
"github.com/prometheus/prometheus/pkg/relabel"
|
||||||
|
@ -1042,6 +1043,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
|
|
||||||
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
|
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
|
||||||
backoff := cfg.MinBackoff
|
backoff := cfg.MinBackoff
|
||||||
|
sleepDuration := model.Duration(0)
|
||||||
try := 0
|
try := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -1058,16 +1060,29 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the error is unrecoverable, we should not retry.
|
// If the error is unrecoverable, we should not retry.
|
||||||
if _, ok := err.(RecoverableError); !ok {
|
backoffErr, ok := err.(RecoverableError)
|
||||||
|
if !ok {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sleepDuration = backoff
|
||||||
|
if backoffErr.retryAfter > 0 {
|
||||||
|
sleepDuration = backoffErr.retryAfter
|
||||||
|
level.Info(l).Log("msg", "Retrying after duration specified by Retry-After header", "duration", sleepDuration)
|
||||||
|
} else if backoffErr.retryAfter < 0 {
|
||||||
|
level.Debug(l).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(time.Duration(sleepDuration)):
|
||||||
|
}
|
||||||
|
|
||||||
// If we make it this far, we've encountered a recoverable error and will retry.
|
// If we make it this far, we've encountered a recoverable error and will retry.
|
||||||
onRetry()
|
onRetry()
|
||||||
level.Warn(l).Log("msg", "Failed to send batch, retrying", "err", err)
|
level.Warn(l).Log("msg", "Failed to send batch, retrying", "err", err)
|
||||||
|
|
||||||
time.Sleep(time.Duration(backoff))
|
backoff = sleepDuration * 2
|
||||||
backoff = backoff * 2
|
|
||||||
|
|
||||||
if backoff > cfg.MaxBackoff {
|
if backoff > cfg.MaxBackoff {
|
||||||
backoff = cfg.MaxBackoff
|
backoff = cfg.MaxBackoff
|
||||||
|
|
Loading…
Reference in a new issue