Latest updates to review comments

Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>
This commit is contained in:
Alex Greenbank 2024-04-04 12:56:40 +00:00
parent 7b40203302
commit 4a758f1685
6 changed files with 115 additions and 99 deletions

View file

@ -47,8 +47,20 @@ const maxErrMsgLen = 1024
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
// If we send a Remote Write 2.0 request to a Remote Write endpoint that only understands
// Remote Write 1.0 it will respond with an error. We need to handle these errors
// accordingly. Any 5xx errors will just need to be retried as they are considered
// transient/recoverable errors. A 4xx error will need to be passed back to the queue
// manager in order to be re-encoded in a suitable format.
// A Remote Write 2.0 request sent to, for example, a Prometheus 2.50 receiver (which does
// not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver.
var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400
// A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version)
// result in an HTTP 406 status code to indicate that it does not accept the protocol or
// encoding of that request and that the sender should retry with a more suitable protocol
// version or encoding.
var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406
var ( var (
@ -205,7 +217,7 @@ type RecoverableError struct {
// Attempt a HEAD request against a remote write endpoint to see what it supports. // Attempt a HEAD request against a remote write endpoint to see what it supports.
func (c *Client) probeRemoteVersions(ctx context.Context) error { func (c *Client) probeRemoteVersions(ctx context.Context) error {
// We assume we are in Version2 mode otherwise we shouldn't be calling this // We assume we are in Version2 mode otherwise we shouldn't be calling this.
httpReq, err := http.NewRequest("HEAD", c.urlString, nil) httpReq, err := http.NewRequest("HEAD", c.urlString, nil)
if err != nil { if err != nil {
@ -214,7 +226,7 @@ func (c *Client) probeRemoteVersions(ctx context.Context) error {
return err return err
} }
// Set the version header to be nice // Set the version header to be nice.
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
httpReq.Header.Set("User-Agent", UserAgent) httpReq.Header.Set("User-Agent", UserAgent)
@ -223,32 +235,32 @@ func (c *Client) probeRemoteVersions(ctx context.Context) error {
httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
if err != nil { if err != nil {
// We don't attempt a retry here // We don't attempt a retry here.
return err return err
} }
// See if we got a header anyway // See if we got a header anyway.
promHeader := httpResp.Header.Get(RemoteWriteVersionHeader) promHeader := httpResp.Header.Get(RemoteWriteVersionHeader)
// Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank.
if promHeader != "" { if promHeader != "" {
c.lastRWHeader = promHeader c.lastRWHeader = promHeader
} }
// Check for an error // Check for an error.
if httpResp.StatusCode != 200 { if httpResp.StatusCode != 200 {
if httpResp.StatusCode == 405 { if httpResp.StatusCode == 405 {
// If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't // If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't
// understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten // understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten
// even if it is blank // even if it is blank.
// This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives // This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives
// a response that confirms it can speak 2.0 // a response that confirms it can speak 2.0.
c.lastRWHeader = promHeader c.lastRWHeader = promHeader
} }
return fmt.Errorf(httpResp.Status) return fmt.Errorf(httpResp.Status)
} }
// All ok, return no error // All ok, return no error.
return nil return nil
} }
@ -269,7 +281,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
if rwFormat == Version1 { if rwFormat == Version1 {
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue) httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue)
} else { } else {
// Set the right header if we're using v2.0 remote write protocol // Set the right header if we're using v2.0 remote write protocol.
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
} }
@ -294,9 +306,9 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
httpResp.Body.Close() httpResp.Body.Close()
}() }()
// See if we got a X-Prometheus-Remote-Write header in the response // See if we got a X-Prometheus-Remote-Write header in the response.
if promHeader := httpResp.Header.Get(RemoteWriteVersionHeader); promHeader != "" { if promHeader := httpResp.Header.Get(RemoteWriteVersionHeader); promHeader != "" {
// Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank.
// (It's blank if it wasn't present, we don't care about that distinction.) // (It's blank if it wasn't present, we don't care about that distinction.)
c.lastRWHeader = promHeader c.lastRWHeader = promHeader
} }
@ -309,18 +321,18 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
} }
switch httpResp.StatusCode { switch httpResp.StatusCode {
case 400: case 400:
// Return an unrecoverable error to indicate the 400 // Return an unrecoverable error to indicate the 400.
// This then gets passed up the chain so we can react to it properly // This then gets passed up the chain so we can react to it properly.
// TODO(alexg) Do we want to include the first line of the message? // TODO(alexg) Do we want to include the first line of the message?
return ErrStatusBadRequest return ErrStatusBadRequest
case 406: case 406:
// Return an unrecoverable error to indicate the 406 // Return an unrecoverable error to indicate the 406.
// This then gets passed up the chain so we can react to it properly // This then gets passed up the chain so we can react to it properly.
// TODO(alexg) Do we want to include the first line of the message? // TODO(alexg) Do we want to include the first line of the message?
// TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error? // TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error?
return ErrStatusNotAcceptable return ErrStatusNotAcceptable
default: default:
// We want to end up returning a non-specific error // We want to end up returning a non-specific error.
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
} }
} }

View file

@ -394,15 +394,15 @@ type WriteClient interface {
Name() string Name() string
// Endpoint is the remote read or write endpoint for the storage client. // Endpoint is the remote read or write endpoint for the storage client.
Endpoint() string Endpoint() string
// Get the protocol versions supported by the endpoint // Get the protocol versions supported by the endpoint.
probeRemoteVersions(ctx context.Context) error probeRemoteVersions(ctx context.Context) error
// Get the last RW header received from the endpoint // Get the last RW header received from the endpoint.
GetLastRWHeader() string GetLastRWHeader() string
} }
const ( const (
Version1 config.RemoteWriteFormat = iota // 1.0, 0.1, etc. Version1 config.RemoteWriteFormat = iota // 1.0, 0.1, etc.
Version2 // symbols are indices into an array of strings Version2 // symbols are indices into an array of strings.
) )
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
@ -576,7 +576,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples. // Build the WriteRequest with no samples.
// Get compression to use from content negotiation based on last header seen (defaults to snappy) // Get compression to use from content negotiation based on last header seen (defaults to snappy).
compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader()) compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader())
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression) req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression)
@ -1500,26 +1500,25 @@ func (q *queue) newBatch(capacity int) []timeSeries {
func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) { func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) {
if rwFormat == Version1 { if rwFormat == Version1 {
// If we're only handling Version1 then all we can do is that with snappy compression // If we're only handling Version1 then all we can do is that with snappy compression.
return "snappy", Version1 return "snappy", Version1
} }
if rwFormat != Version2 { if rwFormat != Version2 {
// If we get here then someone has added a new RemoteWriteFormat value but hasn't // If we get here then someone has added a new RemoteWriteFormat value but hasn't
// fixed this function to handle it // fixed this function to handle it. Panic!
// panic!
panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat)) panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat))
} }
if lastHeaderSeen == "" { if lastHeaderSeen == "" {
// We haven't had a valid header, so we just default to 0.1.0/snappy // We haven't had a valid header, so we just default to "0.1.0/snappy".
return "snappy", Version1 return "snappy", Version1
} }
// We can currently handle: // We can currently handle:
// "2.0;snappy" // "2.0;snappy"
// "0.1.0" - implicit compression of snappy // "0.1.0" - implicit compression of snappy
// lastHeaderSeen should contain a list of tuples // lastHeaderSeen should contain a list of tuples.
// If we find a match to something we can handle then we can return that // If we find a match to something we can handle then we can return that.
for _, tuple := range strings.Split(lastHeaderSeen, ",") { for _, tuple := range strings.Split(lastHeaderSeen, ",") {
// Remove spaces from the tuple // Remove spaces from the tuple.
curr := strings.ReplaceAll(tuple, " ", "") curr := strings.ReplaceAll(tuple, " ", "")
switch curr { switch curr {
case "2.0;snappy": case "2.0;snappy":
@ -1529,7 +1528,7 @@ func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string)
} }
} }
// Otherwise we have to default to "0.1.0" // Otherwise we have to default to "0.1.0".
return "snappy", Version1 return "snappy", Version1
} }
@ -1626,20 +1625,20 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok { if !ok {
return return
} }
// Resend logic on 406 // Resend logic on 406.
// ErrStatusNotAcceptable is a new error defined in client.go // ErrStatusNotAcceptable is a new error defined in client.go.
// Work out what version to send based on the last header seen and the QM's rwFormat setting // Work out what version to send based on the last header seen and the QM's rwFormat setting.
// TODO(alexg) - see comments below about retry/renegotiate design // TODO(alexg) - see comments below about retry/renegotiate design.
for attemptNos := 1; attemptNos <= 3; attemptNos++ { for attemptNos := 1; attemptNos <= 3; attemptNos++ {
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
sendErr := attemptBatchSend(batch, rwFormat, compression, false) sendErr := attemptBatchSend(batch, rwFormat, compression, false)
if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) {
// It wasn't an error, or wasn't a 406 or 400 so we can just stop trying // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying.
break break
} }
// If we get either of the two errors (406, 400) we loop and re-negotiate // If we get either of the two errors (406, 400) we loop and re-negotiate.
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
@ -1651,15 +1650,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
for attemptNos := 1; attemptNos <= 3; attemptNos++ { for attemptNos := 1; attemptNos <= 3; attemptNos++ {
// Work out what version to send based on the last header seen and the QM's rwFormat setting // Work out what version to send based on the last header seen and the QM's rwFormat setting.
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
sendErr := attemptBatchSend(batch, rwFormat, compression, true) sendErr := attemptBatchSend(batch, rwFormat, compression, true)
if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) {
// It wasn't an error, or wasn't a 406 or 400 so we can just stop trying // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying.
break break
} }
// If we get either of the two errors (406, 400) we loop and re-negotiate // If we get either of the two errors (406, 400) we loop and re-negotiate.
} }
// TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we // TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we
// Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format // Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format
@ -1720,7 +1719,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression) err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
// Return the error in case it is a 406 and we need to reformat the data // Return the error in case it is a 406 and we need to reformat the data.
return err return err
} }
@ -1729,7 +1728,7 @@ func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression) err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
// Return the error in case it is a 406 and we need to reformat the data // Return the error in case it is a 406 and we need to reformat the data.
return err return err
} }
@ -2095,6 +2094,22 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
} }
func compressPayload(tmpbuf *[]byte, inp []byte, compression string) ([]byte, error) {
var compressed []byte
switch compression {
case "snappy":
compressed = snappy.Encode(*tmpbuf, inp)
if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) {
// grow the buffer for the next time
*tmpbuf = make([]byte, n)
}
return compressed, nil
default:
return compressed, fmt.Errorf("Unknown compression scheme [%s]", compression)
}
}
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, compression string) ([]byte, int64, int64, error) { func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, compression string) ([]byte, int64, int64, error) {
highest, lowest, timeSeries, highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
@ -2128,16 +2143,11 @@ func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metada
var compressed []byte var compressed []byte
switch compression { compressed, err = compressPayload(buf, pBuf.Bytes(), compression)
case "snappy": if err != nil {
compressed = snappy.Encode(*buf, pBuf.Bytes()) return nil, highest, lowest, err
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
default:
return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression)
} }
return compressed, highest, lowest, nil return compressed, highest, lowest, nil
} }
@ -2205,15 +2215,9 @@ func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels
var compressed []byte var compressed []byte
switch compression { compressed, err = compressPayload(buf, data, compression)
case "snappy": if err != nil {
compressed = snappy.Encode(*buf, data) return nil, highest, lowest, err
if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
default:
return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression)
} }
return compressed, highest, lowest, nil return compressed, highest, lowest, nil

View file

@ -192,19 +192,19 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
continue continue
} }
// Work out what protocol and compression to use for this endpoint // Work out what protocol and compression to use for this endpoint.
// Default to Remote Write Version1 // Default to Remote Write Version1.
rwFormat := Version1 rwFormat := Version1
switch rwConf.ProtocolVersion { switch rwConf.ProtocolVersion {
case Version1: case Version1:
// We use the standard value as there's no negotiation to be had // We use the standard value as there's no negotiation to be had.
case Version2: case Version2:
rwFormat = Version2 rwFormat = Version2
// If this newer remote write format is enabled then we need to probe the remote server // If this newer remote write format is enabled then we need to probe the remote server
// to work out the desired protocol version and compressions // to work out the desired protocol version and compressions.
// The value of the header is kept in the client so no need to see it here // The value of the header is kept in the client so no need to see it here.
_ = c.probeRemoteVersions(context.Background()) _ = c.probeRemoteVersions(context.Background())
// We ignore any error here, at some point we may choose to log it // We ignore any error here, at some point we may choose to log it.
} }
// Redacted to remove any passwords in the URL (that are // Redacted to remove any passwords in the URL (that are

View file

@ -45,7 +45,7 @@ const (
) )
func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string { func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
// Return the correct remote write header name/values based on provided rwFormat // Return the correct remote write header name/values based on provided rwFormat.
ret := make(map[string]string, 1) ret := make(map[string]string, 1)
switch rwFormat { switch rwFormat {
@ -54,9 +54,9 @@ func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
case Version2: case Version2:
// We need to add the supported protocol definitions in order: // We need to add the supported protocol definitions in order:
tuples := make([]string, 0, 2) tuples := make([]string, 0, 2)
// Add 2.0;snappy // Add "2.0;snappy".
tuples = append(tuples, RemoteWriteVersion20HeaderValue+";snappy") tuples = append(tuples, RemoteWriteVersion20HeaderValue+";snappy")
// Add default 0.1.0 // Add default "0.1.0".
tuples = append(tuples, RemoteWriteVersion1HeaderValue) tuples = append(tuples, RemoteWriteVersion1HeaderValue)
ret[RemoteWriteVersionHeader] = strings.Join(tuples, ",") ret[RemoteWriteVersionHeader] = strings.Join(tuples, ",")
} }
@ -68,8 +68,8 @@ type writeHeadHandler struct {
remoteWrite20HeadRequests prometheus.Counter remoteWrite20HeadRequests prometheus.Counter
// Experimental feature, new remote write proto format // Experimental feature, new remote write proto format.
// The handler will accept the new format, but it can still accept the old one // The handler will accept the new format, but it can still accept the old one.
rwFormat config.RemoteWriteFormat rwFormat config.RemoteWriteFormat
} }
@ -91,9 +91,9 @@ func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat
} }
func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Send a response to the HEAD request based on the format supported // Send a response to the HEAD request based on the format supported.
// Add appropriate header values for the specific rwFormat // Add appropriate header values for the specific rwFormat.
for hName, hValue := range rwHeaderNameValues(h.rwFormat) { for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
w.Header().Set(hName, hValue) w.Header().Set(hName, hValue)
} }
@ -107,8 +107,8 @@ type writeHandler struct {
samplesWithInvalidLabelsTotal prometheus.Counter samplesWithInvalidLabelsTotal prometheus.Counter
// Experimental feature, new remote write proto format // Experimental feature, new remote write proto format.
// The handler will accept the new format, but it can still accept the old one // The handler will accept the new format, but it can still accept the old one.
rwFormat config.RemoteWriteFormat rwFormat config.RemoteWriteFormat
} }
@ -135,36 +135,36 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error var err error
// Set the header(s) in the response based on the rwFormat the server supports // Set the header(s) in the response based on the rwFormat the server supports.
for hName, hValue := range rwHeaderNameValues(h.rwFormat) { for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
w.Header().Set(hName, hValue) w.Header().Set(hName, hValue)
} }
// Parse the headers to work out how to handle this // Parse the headers to work out how to handle this.
contentEncoding := r.Header.Get("Content-Encoding") contentEncoding := r.Header.Get("Content-Encoding")
protoVer := r.Header.Get(RemoteWriteVersionHeader) protoVer := r.Header.Get(RemoteWriteVersionHeader)
switch protoVer { switch protoVer {
case "": case "":
// No header provided, assume 0.1.0 as everything that relies on later // No header provided, assume 0.1.0 as everything that relies on later.
protoVer = RemoteWriteVersion1HeaderValue protoVer = RemoteWriteVersion1HeaderValue
case RemoteWriteVersion1HeaderValue, RemoteWriteVersion20HeaderValue: case RemoteWriteVersion1HeaderValue, RemoteWriteVersion20HeaderValue:
// We know this header, woo // We know this header, woo.
default: default:
// We have a version in the header but it is not one we recognise // We have a version in the header but it is not one we recognise.
// TODO(alexg) - make a proper error for this? // TODO(alexg) - make a proper error for this?
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer)
// Return a 406 so that the client can choose a more appropriate protocol to use // Return a 406 so that the client can choose a more appropriate protocol to use.
http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable)
return return
} }
// Deal with 0.1.0 clients that forget to send Content-Encoding // Deal with 0.1.0 clients that forget to send Content-Encoding.
if protoVer == RemoteWriteVersion1HeaderValue && contentEncoding == "" { if protoVer == RemoteWriteVersion1HeaderValue && contentEncoding == "" {
contentEncoding = "snappy" contentEncoding = "snappy"
} }
// Read the request body // Read the request body.
body, err := io.ReadAll(r.Body) body, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
@ -172,7 +172,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
// Deal with contentEncoding first // Deal with contentEncoding first.
var decompressed []byte var decompressed []byte
switch contentEncoding { switch contentEncoding {
@ -185,13 +185,13 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
default: default:
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding)
// Return a 406 so that the client can choose a more appropriate protocol to use // Return a 406 so that the client can choose a more appropriate protocol to use.
http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable) http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable)
return return
} }
// Now we have a decompressed buffer we can unmarshal it // Now we have a decompressed buffer we can unmarshal it.
// At this point we are happy with the version but need to check the encoding // At this point we are happy with the version but need to check the encoding.
switch protoVer { switch protoVer {
case RemoteWriteVersion1HeaderValue: case RemoteWriteVersion1HeaderValue:
var req prompb.WriteRequest var req prompb.WriteRequest
@ -202,7 +202,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
err = h.write(r.Context(), &req) err = h.write(r.Context(), &req)
case RemoteWriteVersion20HeaderValue: case RemoteWriteVersion20HeaderValue:
// 2.0 request // 2.0 request.
var reqMinStr writev2.WriteRequest var reqMinStr writev2.WriteRequest
if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil { if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())

View file

@ -51,19 +51,19 @@ func TestRemoteWriteHeadHandler(t *testing.T) {
resp := recorder.Result() resp := recorder.Result()
require.Equal(t, http.StatusOK, resp.StatusCode) require.Equal(t, http.StatusOK, resp.StatusCode)
// Check header is expected value // Check header is expected value.
protHeader := resp.Header.Get(RemoteWriteVersionHeader) protHeader := resp.Header.Get(RemoteWriteVersionHeader)
require.Equal(t, "2.0;snappy,0.1.0", protHeader) require.Equal(t, "2.0;snappy,0.1.0", protHeader)
} }
func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) { func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
// Send a v2 request without a "Content-Encoding:" header -> 406 // Send a v2 request without a "Content-Encoding:" header -> 406.
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
// Do not provide "Content-Encoding: snappy" header // Do not provide "Content-Encoding: snappy" header.
// req.Header.Set("Content-Encoding", "snappy") // req.Header.Set("Content-Encoding", "snappy")
require.NoError(t, err) require.NoError(t, err)
@ -74,12 +74,12 @@ func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
resp := recorder.Result() resp := recorder.Result()
// Should give us a 406 // Should give us a 406.
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
} }
func TestRemoteWriteHandlerInvalidCompression(t *testing.T) { func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
// Send a v2 request without an unhandled compression scheme -> 406 // Send a v2 request without an unhandled compression scheme -> 406.
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
require.NoError(t, err) require.NoError(t, err)
@ -95,12 +95,12 @@ func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
resp := recorder.Result() resp := recorder.Result()
// Expect a 406 // Expect a 406.
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
} }
func TestRemoteWriteHandlerInvalidVersion(t *testing.T) { func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
// Send a protocol version number that isn't recognised/supported -> 406 // Send a protocol version number that isn't recognised/supported -> 406.
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
require.NoError(t, err) require.NoError(t, err)
@ -115,7 +115,7 @@ func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
resp := recorder.Result() resp := recorder.Result()
// Expect a 406 // Expect a 406.
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
} }
@ -135,7 +135,7 @@ func TestRemoteWriteHandler(t *testing.T) {
resp := recorder.Result() resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode) require.Equal(t, http.StatusNoContent, resp.StatusCode)
// Check header is expected value // Check header is expected value.
protHeader := resp.Header.Get(RemoteWriteVersionHeader) protHeader := resp.Header.Get(RemoteWriteVersionHeader)
require.Equal(t, "0.1.0", protHeader) require.Equal(t, "0.1.0", protHeader)
@ -175,7 +175,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
// Must provide "Content-Encoding: snappy" header // Must provide "Content-Encoding: snappy" header.
req.Header.Set("Content-Encoding", "snappy") req.Header.Set("Content-Encoding", "snappy")
require.NoError(t, err) require.NoError(t, err)
@ -188,7 +188,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
resp := recorder.Result() resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode) require.Equal(t, http.StatusNoContent, resp.StatusCode)
// Check header is expected value // Check header is expected value.
protHeader := resp.Header.Get(RemoteWriteVersionHeader) protHeader := resp.Header.Get(RemoteWriteVersionHeader)
require.Equal(t, "2.0;snappy,0.1.0", protHeader) require.Equal(t, "2.0;snappy,0.1.0", protHeader)
@ -196,7 +196,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
j := 0 j := 0
k := 0 k := 0
// the reduced write request is equivalent to the write request fixture. // the reduced write request is equivalent to the write request fixture.
// we can use it for // we can use it for.
for _, ts := range writeRequestMinimizedFixture.Timeseries { for _, ts := range writeRequestMinimizedFixture.Timeseries {
ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols) ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols)
for _, s := range ts.Samples { for _, s := range ts.Samples {

View file

@ -297,7 +297,7 @@ func NewAPI(
} }
if rwEnabled { if rwEnabled {
// TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising // TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising.
// For rollout we do two phases: // For rollout we do two phases:
// 0. (Before) no flags set // 0. (Before) no flags set
// 1. (During) support new protocols but don't advertise // 1. (During) support new protocols but don't advertise