From cef9891cdd88d83d2e6d31f5bbac26886cddf54a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Pazos?= Date: Mon, 23 Oct 2023 12:39:06 -0300 Subject: [PATCH] remote write handler to checks version header --- storage/remote/client.go | 10 +++++----- storage/remote/write_handler.go | 9 +++++++-- storage/remote/write_handler_test.go | 2 ++ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 7439ab725c..7f0615577a 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -83,7 +83,7 @@ func init() { type Client struct { remoteName string // Used to differentiate clients in metrics. urlString string // url.String() - remotewrite11 bool // For write clients, ignored for read clients. + remoteWrite11 bool // For write clients, ignored for read clients. Client *http.Client timeout time.Duration @@ -165,7 +165,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { httpClient.Transport = otelhttp.NewTransport(t) return &Client{ - remotewrite11: conf.RemoteWrite11, + remoteWrite11: conf.RemoteWrite11, remoteName: name, urlString: conf.URL.String(), Client: httpClient, @@ -212,10 +212,10 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpReq.Header.Set("User-Agent", UserAgent) // Set the right header if we're using v1.1 remote write protocol - if c.remotewrite11 { - httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.1") // TODO-RW11: Final value? + if c.remoteWrite11 { + httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) } else { - httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue) } if attempt > 0 { diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 5655117e21..06dd6185b1 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -32,6 +32,12 @@ import ( otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" ) +const ( + RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" + RemoteWriteVersion1HeaderValue = "0.1.0" + RemoteWriteVersion11HeaderValue = "1.1" // TODO-RW11: Final value? +) + type writeHandler struct { logger log.Logger appendable storage.Appendable @@ -68,8 +74,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var err error var req *prompb.WriteRequest var reqWithRefs *prompb.WriteRequestWithRefs - // TODO-RW11: Need to check headers to decide what version is and what to do - if h.enableRemoteWrite11 { + if h.enableRemoteWrite11 && r.Header.Get(RemoteWriteVersionHeader) == RemoteWriteVersion11HeaderValue { reqWithRefs, err = DecodeReducedWriteRequest(r.Body) } else { req, err = DecodeWriteRequest(r.Body) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index ec04985a07..5bdedb9ef1 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -199,6 +199,7 @@ func BenchmarkReducedRemoteWriteHandler(b *testing.B) { require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) require.NoError(b, err) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) reqs = append(reqs, req) } @@ -298,6 +299,7 @@ func TestRemoteWriteHandlerReducedProtocol(t *testing.T) { require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) require.NoError(t, err) appendable := &mockAppendable{}