From bb12d690c395917f0308af45fe5aa57422a5b6f4 Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Wed, 17 Apr 2024 14:41:04 +0100 Subject: [PATCH] remote write 2.0 - content negotiation remediation (#13921) * Consolidate renegotiation error into one, fix tests Signed-off-by: Alex Greenbank * fix metric name and actuall increment counter Signed-off-by: Alex Greenbank --------- Signed-off-by: Alex Greenbank --- storage/remote/client.go | 21 +++++++++++++-------- storage/remote/queue_manager.go | 24 +++++++++--------------- storage/remote/queue_manager_test.go | 10 +++++----- storage/remote/write_handler.go | 16 +++++++++------- 4 files changed, 36 insertions(+), 35 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 2f1cc7581c..0426990367 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -17,7 +17,6 @@ import ( "bufio" "bytes" "context" - "errors" "fmt" "io" "math/rand" @@ -55,13 +54,22 @@ var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) // A Remote Write 2.0 request sent to, for example, a Prometheus 2.50 receiver (which does // not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver. -var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 // A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version) // result in an HTTP 406 status code to indicate that it does not accept the protocol or // encoding of that request and that the sender should retry with a more suitable protocol // version or encoding. -var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 + +// We bundle any error we want to return to the queue manager to trigger a renegotiation into +// this custom error. +type ErrRenegotiate struct { + FirstLine string + StatusCode int +} + +func (r *ErrRenegotiate) Error() string { + return fmt.Sprintf("HTTP %d: msg: %s", r.StatusCode, r.FirstLine) +} var ( remoteReadQueriesTotal = prometheus.NewCounterVec( @@ -323,14 +331,11 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co case 400: // Return an unrecoverable error to indicate the 400. // This then gets passed up the chain so we can react to it properly. - // TODO(alexg) Do we want to include the first line of the message? - return ErrStatusBadRequest + return &ErrRenegotiate{line, httpResp.StatusCode} case 406: // Return an unrecoverable error to indicate the 406. // This then gets passed up the chain so we can react to it properly. - // TODO(alexg) Do we want to include the first line of the message? - // TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error? - return ErrStatusNotAcceptable + return &ErrRenegotiate{line, httpResp.StatusCode} default: // We want to end up returning a non-specific error. err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 15773bbcdf..5f423a2546 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1625,20 +1625,18 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - // Resend logic on 406. - // ErrStatusNotAcceptable is a new error defined in client.go. - // Work out what version to send based on the last header seen and the QM's rwFormat setting. - // TODO(alexg) - see comments below about retry/renegotiate design. for attemptNos := 1; attemptNos <= 3; attemptNos++ { lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) sendErr := attemptBatchSend(batch, rwFormat, compression, false) - if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { - // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying. + pErr := &ErrRenegotiate{} + if sendErr == nil || !errors.As(sendErr, &pErr) { + // No error, or error wasn't a 406 or 400, so we can stop trying. break } - // If we get either of the two errors (406, 400) we loop and re-negotiate. + // If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate. + // TODO(alexg) - add retry/renegotiate metrics here } queue.ReturnForReuse(batch) @@ -1654,17 +1652,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) sendErr := attemptBatchSend(batch, rwFormat, compression, true) - if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { - // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying. + pErr := &ErrRenegotiate{} + if sendErr == nil || !errors.As(sendErr, &pErr) { + // No error, or error wasn't a 406 or 400, so we can stop trying. break } - // If we get either of the two errors (406, 400) we loop and re-negotiate. + // If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate. } - // TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we - // Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format - // Q: Do we always try downgrading to 1.0 at least once? - // Q: Do we limit our attempts to only try a particular protocol/encoding tuple once? - // Q: Is 3 a suitable limit? // TODO(alexg) - add retry/renegotiate metrics here } queue.ReturnForReuse(batch) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 447932987b..d99a393e15 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -109,23 +109,23 @@ func TestContentNegotiation(t *testing.T) { // Now test where the server suddenly stops speaking 2.0 and we need to downgrade. { success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, }, }, // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400. { success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 400}, attemptString: "0,1,snappy,HTTP 400: msg: "}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, }, }, // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only. { success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, - {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,0,snappy,HTTP 406: msg: "}, + {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries). }, }, diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index b80799706d..8eed1b5399 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -66,7 +66,7 @@ func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string { type writeHeadHandler struct { logger log.Logger - remoteWrite20HeadRequests prometheus.Counter + remoteWriteHeadRequests prometheus.Counter // Experimental feature, new remote write proto format. // The handler will accept the new format, but it can still accept the old one. @@ -77,27 +77,29 @@ func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat h := &writeHeadHandler{ logger: logger, rwFormat: rwFormat, - remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ + remoteWriteHeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "api", - Name: "remote_write_20_head_requests", - Help: "The number of remote write 2.0 head requests.", + Name: "remote_write_head_requests", + Help: "The number of remote write HEAD requests.", }), } if reg != nil { - reg.MustRegister(h.remoteWrite20HeadRequests) + reg.MustRegister(h.remoteWriteHeadRequests) } return h } +// Send a response to the HEAD request based on the format supported. func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Send a response to the HEAD request based on the format supported. - // Add appropriate header values for the specific rwFormat. for hName, hValue := range rwHeaderNameValues(h.rwFormat) { w.Header().Set(hName, hValue) } + // Increment counter + h.remoteWriteHeadRequests.Inc() + w.WriteHeader(http.StatusOK) }