remote write 2.0 - content negotiation remediation (#13921)

* Consolidate renegotiation error into one, fix tests

Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>

* fix metric name and actuall increment counter

Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>

---------

Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>
This commit is contained in:
Alex Greenbank 2024-04-17 14:41:04 +01:00 committed by GitHub
parent ad77987bdc
commit bb12d690c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 36 additions and 35 deletions

View file

@ -17,7 +17,6 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
@ -55,13 +54,22 @@ var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
// A Remote Write 2.0 request sent to, for example, a Prometheus 2.50 receiver (which does // A Remote Write 2.0 request sent to, for example, a Prometheus 2.50 receiver (which does
// not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver. // not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver.
var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400
// A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version) // A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version)
// result in an HTTP 406 status code to indicate that it does not accept the protocol or // result in an HTTP 406 status code to indicate that it does not accept the protocol or
// encoding of that request and that the sender should retry with a more suitable protocol // encoding of that request and that the sender should retry with a more suitable protocol
// version or encoding. // version or encoding.
var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406
// We bundle any error we want to return to the queue manager to trigger a renegotiation into
// this custom error.
type ErrRenegotiate struct {
FirstLine string
StatusCode int
}
func (r *ErrRenegotiate) Error() string {
return fmt.Sprintf("HTTP %d: msg: %s", r.StatusCode, r.FirstLine)
}
var ( var (
remoteReadQueriesTotal = prometheus.NewCounterVec( remoteReadQueriesTotal = prometheus.NewCounterVec(
@ -323,14 +331,11 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
case 400: case 400:
// Return an unrecoverable error to indicate the 400. // Return an unrecoverable error to indicate the 400.
// This then gets passed up the chain so we can react to it properly. // This then gets passed up the chain so we can react to it properly.
// TODO(alexg) Do we want to include the first line of the message? return &ErrRenegotiate{line, httpResp.StatusCode}
return ErrStatusBadRequest
case 406: case 406:
// Return an unrecoverable error to indicate the 406. // Return an unrecoverable error to indicate the 406.
// This then gets passed up the chain so we can react to it properly. // This then gets passed up the chain so we can react to it properly.
// TODO(alexg) Do we want to include the first line of the message? return &ErrRenegotiate{line, httpResp.StatusCode}
// TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error?
return ErrStatusNotAcceptable
default: default:
// We want to end up returning a non-specific error. // We want to end up returning a non-specific error.
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)

View file

@ -1625,20 +1625,18 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok { if !ok {
return return
} }
// Resend logic on 406.
// ErrStatusNotAcceptable is a new error defined in client.go.
// Work out what version to send based on the last header seen and the QM's rwFormat setting. // Work out what version to send based on the last header seen and the QM's rwFormat setting.
// TODO(alexg) - see comments below about retry/renegotiate design.
for attemptNos := 1; attemptNos <= 3; attemptNos++ { for attemptNos := 1; attemptNos <= 3; attemptNos++ {
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
sendErr := attemptBatchSend(batch, rwFormat, compression, false) sendErr := attemptBatchSend(batch, rwFormat, compression, false)
if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { pErr := &ErrRenegotiate{}
// It wasn't an error, or wasn't a 406 or 400 so we can just stop trying. if sendErr == nil || !errors.As(sendErr, &pErr) {
// No error, or error wasn't a 406 or 400, so we can stop trying.
break break
} }
// If we get either of the two errors (406, 400) we loop and re-negotiate. // If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate.
// TODO(alexg) - add retry/renegotiate metrics here
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
@ -1654,17 +1652,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
sendErr := attemptBatchSend(batch, rwFormat, compression, true) sendErr := attemptBatchSend(batch, rwFormat, compression, true)
if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { pErr := &ErrRenegotiate{}
// It wasn't an error, or wasn't a 406 or 400 so we can just stop trying. if sendErr == nil || !errors.As(sendErr, &pErr) {
// No error, or error wasn't a 406 or 400, so we can stop trying.
break break
} }
// If we get either of the two errors (406, 400) we loop and re-negotiate. // If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate.
} }
// TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we
// Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format
// Q: Do we always try downgrading to 1.0 at least once?
// Q: Do we limit our attempts to only try a particular protocol/encoding tuple once?
// Q: Is 3 a suitable limit?
// TODO(alexg) - add retry/renegotiate metrics here // TODO(alexg) - add retry/renegotiate metrics here
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)

View file

@ -109,23 +109,23 @@ func TestContentNegotiation(t *testing.T) {
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade. // Now test where the server suddenly stops speaking 2.0 and we need to downgrade.
{ {
success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400. // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400.
{ {
success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 400}, attemptString: "0,1,snappy,HTTP 400: msg: "},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Now test where the server flip flops between "2.0;snappy" and "0.1.0" only. // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only.
{ {
success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,0,snappy,HTTP 406: msg: "},
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
// There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries). // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries).
}, },
}, },

View file

@ -66,7 +66,7 @@ func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
type writeHeadHandler struct { type writeHeadHandler struct {
logger log.Logger logger log.Logger
remoteWrite20HeadRequests prometheus.Counter remoteWriteHeadRequests prometheus.Counter
// Experimental feature, new remote write proto format. // Experimental feature, new remote write proto format.
// The handler will accept the new format, but it can still accept the old one. // The handler will accept the new format, but it can still accept the old one.
@ -77,27 +77,29 @@ func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat
h := &writeHeadHandler{ h := &writeHeadHandler{
logger: logger, logger: logger,
rwFormat: rwFormat, rwFormat: rwFormat,
remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ remoteWriteHeadRequests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus", Namespace: "prometheus",
Subsystem: "api", Subsystem: "api",
Name: "remote_write_20_head_requests", Name: "remote_write_head_requests",
Help: "The number of remote write 2.0 head requests.", Help: "The number of remote write HEAD requests.",
}), }),
} }
if reg != nil { if reg != nil {
reg.MustRegister(h.remoteWrite20HeadRequests) reg.MustRegister(h.remoteWriteHeadRequests)
} }
return h return h
} }
func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Send a response to the HEAD request based on the format supported. // Send a response to the HEAD request based on the format supported.
func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Add appropriate header values for the specific rwFormat. // Add appropriate header values for the specific rwFormat.
for hName, hValue := range rwHeaderNameValues(h.rwFormat) { for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
w.Header().Set(hName, hValue) w.Header().Set(hName, hValue)
} }
// Increment counter
h.remoteWriteHeadRequests.Inc()
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }