diff --git a/storage/remote/client.go b/storage/remote/client.go index 3acdd60089..ee043c4886 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -56,7 +56,7 @@ var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) // not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver. // A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version) -// result in an HTTP 406 status code to indicate that it does not accept the protocol or +// result in an HTTP 415 status code to indicate that it does not accept the protocol or // encoding of that request and that the sender should retry with a more suitable protocol // version or encoding. @@ -226,55 +226,6 @@ type RecoverableError struct { retryAfter model.Duration } -// Attempt a HEAD request against a remote write endpoint to see what it supports. -func (c *Client) probeRemoteVersions(ctx context.Context) error { - // We assume we are in Version2 mode otherwise we shouldn't be calling this. - - httpReq, err := http.NewRequest(http.MethodHead, c.urlString, nil) - if err != nil { - // Errors from NewRequest are from unparsable URLs, so are not - // recoverable. - return err - } - - // Set the version header to be nice. - httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - httpReq.Header.Set("User-Agent", UserAgent) - - ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() - - httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) - if err != nil { - // We don't attempt a retry here. - return err - } - - // See if we got a header anyway. - promHeader := httpResp.Header.Get(RemoteWriteVersionHeader) - - // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank. - if promHeader != "" { - c.lastRWHeader = promHeader - } - - // Check for an error. - if httpResp.StatusCode != http.StatusOK { - if httpResp.StatusCode == http.StatusMethodNotAllowed { - // If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't - // understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten - // even if it is blank. - // This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives - // a response that confirms it can speak 2.0. - c.lastRWHeader = promHeader - } - return fmt.Errorf(httpResp.Status) - } - - // All ok, return no error. - return nil -} - // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // and encoded bytes from codec.go. func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat config.RemoteWriteFormat, compression string) error { @@ -335,8 +286,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co // Return an unrecoverable error to indicate the 400. // This then gets passed up the chain so we can react to it properly. return &ErrRenegotiate{line, httpResp.StatusCode} - case http.StatusNotAcceptable: - // Return an unrecoverable error to indicate the 406. + case http.StatusUnsupportedMediaType: + // Return an unrecoverable error to indicate the 415. // This then gets passed up the chain so we can react to it properly. return &ErrRenegotiate{line, httpResp.StatusCode} default: @@ -377,10 +328,6 @@ func (c Client) Endpoint() string { return c.urlString } -func (c *Client) GetLastRWHeader() string { - return c.lastRWHeader -} - // Read reads from a remote endpoint. func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { c.readQueries.Inc() @@ -463,10 +410,6 @@ func NewTestClient(name, url string) WriteClient { return &TestClient{name: name, url: url} } -func (c *TestClient) probeRemoteVersions(_ context.Context) error { - return nil -} - func (c *TestClient) Store(_ context.Context, req []byte, _ int, _ config.RemoteWriteFormat, _ string) error { r := rand.Intn(200-100) + 100 time.Sleep(time.Duration(r) * time.Millisecond) @@ -480,7 +423,3 @@ func (c *TestClient) Name() string { func (c *TestClient) Endpoint() string { return c.url } - -func (c *TestClient) GetLastRWHeader() string { - return "2.0;snappy,0.1.0" -} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ce9f44b82f..4204452f34 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "strconv" - "strings" "sync" "time" @@ -397,10 +396,6 @@ type WriteClient interface { Name() string // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string - // Get the protocol versions supported by the endpoint. - probeRemoteVersions(ctx context.Context) error - // Get the last RW header received from the endpoint. - GetLastRWHeader() string } const ( @@ -582,7 +577,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p // Build the WriteRequest with no samples. // Get compression to use from content negotiation based on last header seen (defaults to snappy). - compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader()) + compression := "snappy" req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression) if err != nil { @@ -1510,40 +1505,6 @@ func (q *queue) newBatch(capacity int) []timeSeries { return make([]timeSeries, 0, capacity) } -func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) { - if rwFormat == Version1 { - // If we're only handling Version1 then all we can do is that with snappy compression. - return "snappy", Version1 - } - if rwFormat != Version2 { - // If we get here then someone has added a new RemoteWriteFormat value but hasn't - // fixed this function to handle it. Panic! - panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat)) - } - if lastHeaderSeen == "" { - // We haven't had a valid header, so we just default to "0.1.0/snappy". - return "snappy", Version1 - } - // We can currently handle: - // "2.0;snappy" - // "0.1.0" - implicit compression of snappy - // lastHeaderSeen should contain a list of tuples. - // If we find a match to something we can handle then we can return that. - for _, tuple := range strings.Split(lastHeaderSeen, ",") { - // Remove spaces from the tuple. - curr := strings.ReplaceAll(tuple, " ", "") - switch curr { - case "2.0;snappy": - return "snappy", Version2 - case "0.1.0": - return "snappy", Version1 - } - } - - // Otherwise we have to default to "0.1.0". - return "snappy", Version1 -} - func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { defer func() { if s.running.Dec() == 0 { @@ -1637,17 +1598,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - // Work out what version to send based on the last header seen and the QM's rwFormat setting. - for attemptNos := 1; attemptNos <= 3; attemptNos++ { - lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() - compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) - sendErr := attemptBatchSend(batch, rwFormat, compression, false) - pErr := &ErrRenegotiate{} - if sendErr == nil || !errors.As(sendErr, &pErr) { - // No error, or error wasn't a 406 or 400, so we can stop trying. - break - } - // If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate. + compression := "snappy" + rwFormat := s.qm.rwFormat + // TODO(alexg): Need to get rwFormat from somewhere + sendErr := attemptBatchSend(batch, rwFormat, compression, false) + pErr := &ErrRenegotiate{} + if sendErr != nil && errors.As(sendErr, &pErr) { + // If we get either of the two errors (415, 400) bundled in ErrRenegotiate we want to log and metric // TODO(alexg) - add retry/renegotiate metrics here } @@ -1659,19 +1616,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - for attemptNos := 1; attemptNos <= 3; attemptNos++ { - // Work out what version to send based on the last header seen and the QM's rwFormat setting. - lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() - compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) - sendErr := attemptBatchSend(batch, rwFormat, compression, true) - pErr := &ErrRenegotiate{} - if sendErr == nil || !errors.As(sendErr, &pErr) { - // No error, or error wasn't a 406 or 400, so we can stop trying. - break - } - // If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate. + compression := "snappy" + // TODO(alexg): Need to get rwFormat from somewhere + rwFormat := s.qm.rwFormat + sendErr := attemptBatchSend(batch, rwFormat, compression, false) + pErr := &ErrRenegotiate{} + if sendErr != nil && errors.As(sendErr, &pErr) { + // If we get either of the two errors (415, 400) bundled in ErrRenegotiate we want to log and metric + // TODO(alexg) - add retry/renegotiate metrics here } - // TODO(alexg) - add retry/renegotiate metrics here } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1725,7 +1678,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin)) - // Return the error in case it is a 406 and we need to reformat the data. + // Return the error in case it is a 415 and we need to reformat the data. return err } @@ -1734,7 +1687,7 @@ func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin)) - // Return the error in case it is a 406 and we need to reformat the data. + // Return the error in case it is a 415 and we need to reformat the data. return err } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 34f8662e15..800c7c27e0 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -63,146 +63,6 @@ func newHighestTimestampMetric() *maxTimestamp { } } -type contentNegotiationStep struct { - lastRWHeader string - compression string - behaviour error // or nil - attemptString string -} - -func TestContentNegotiation(t *testing.T) { - testcases := []struct { - name string - success bool - qmRwFormat config.RemoteWriteFormat - rwFormat config.RemoteWriteFormat - steps []contentNegotiationStep - }{ - // Test a simple case where the v2 request we send is processed first time. - { - success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, - }, - }, - // Test a simple case where the v1 request we send is processed first time. - { - success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, - }, - }, - // Test a case where the v1 request has a temporary delay but goes through on retry. - // There is no content re-negotiation between first and retry attempts. - { - success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, - }, - }, - // Repeat the above test but with v2. The request has a temporary delay but goes through on retry. - // There is no content re-negotiation between first and retry attempts. - { - success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, - }, - }, - // Now test where the server suddenly stops speaking 2.0 and we need to downgrade. - { - success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, - }, - }, - // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400. - { - success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 400}, attemptString: "0,1,snappy,HTTP 400: msg: "}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, - }, - }, - // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only. - { - success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,0,snappy,HTTP 406: msg: "}, - {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, - // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries). - }, - }, - } - - queueConfig := config.DefaultQueueConfig - queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) - queueConfig.MaxShards = 1 - - // We need to set URL's so that metric creation doesn't panic. - writeConfig := baseRemoteWriteConfig("http://test-storage.com") - writeConfig.QueueConfig = queueConfig - - conf := &config.Config{ - GlobalConfig: config.DefaultGlobalConfig, - RemoteWriteConfigs: []*config.RemoteWriteConfig{ - writeConfig, - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true) - defer s.Close() - - var ( - series []record.RefSeries - metadata []record.RefMetadata - samples []record.RefSample - ) - - // Generates same series in both cases. - samples, series = createTimeseries(1, 1) - metadata = createSeriesMetadata(series) - - // Apply new config. - queueConfig.Capacity = len(samples) - queueConfig.MaxSamplesPerSend = len(samples) - // For now we only ever have a single rw config in this test. - conf.RemoteWriteConfigs[0].ProtocolVersion = tc.qmRwFormat - require.NoError(t, s.ApplyConfig(conf)) - hash, err := toHash(writeConfig) - require.NoError(t, err) - qm := s.rws.queues[hash] - - c := NewTestWriteClient(tc.rwFormat) - c.setSteps(tc.steps) // set expected behaviour. - qm.SetClient(c) - - qm.StoreSeries(series, 0) - qm.StoreMetadata(metadata) - - // Did we expect some data back? - if tc.success { - c.expectSamples(samples, series) - } - qm.Append(samples) - - if !tc.success { - // We just need to sleep for a bit to give it time to run. - time.Sleep(2 * time.Second) - // But we still need to check for data with no delay to avoid race. - c.waitForExpectedData(t, 0*time.Second) - } else { - // We expected data so wait for it. - c.waitForExpectedData(t, 5*time.Second) - } - - require.Equal(t, len(c.sendAttempts), len(tc.steps)) - for i, s := range c.sendAttempts { - require.Equal(t, s, tc.steps[i].attemptString) - } - }) - } -} - func TestSampleDelivery(t *testing.T) { testcases := []struct { name string @@ -968,9 +828,6 @@ type TestWriteClient struct { mtx sync.Mutex buf []byte rwFormat config.RemoteWriteFormat - sendAttempts []string - steps []contentNegotiationStep - currstep int retry bool } @@ -983,12 +840,6 @@ func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient { } } -func (c *TestWriteClient) setSteps(steps []contentNegotiationStep) { - c.steps = steps - c.currstep = -1 // incremented by GetLastRWHeader() - c.retry = false -} - func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { c.mtx.Lock() defer c.mtx.Unlock() @@ -1108,21 +959,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r return err } - attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression) - - if attemptNos > 0 { - // If this is a second attempt then we need to bump to the next step otherwise we loop. - c.currstep++ - } - - // Check if we've been told to return something for this config. - if len(c.steps) > 0 { - if err = c.steps[c.currstep].behaviour; err != nil { - c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err)) - return err - } - } - var reqProto *prompb.WriteRequest switch rwFormat { case Version1: @@ -1136,10 +972,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r } } - if err != nil { - c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err)) - return err - } builder := labels.NewScratchBuilder(0) count := 0 for _, ts := range reqProto.Timeseries { @@ -1165,7 +997,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r } c.writesReceived++ - c.sendAttempts = append(c.sendAttempts, attemptString+",ok") return nil } @@ -1177,20 +1008,6 @@ func (c *TestWriteClient) Endpoint() string { return "http://test-remote.com/1234" } -func (c *TestWriteClient) probeRemoteVersions(_ context.Context) error { - return nil -} - -func (c *TestWriteClient) GetLastRWHeader() string { - c.mtx.Lock() - defer c.mtx.Unlock() - c.currstep++ - if len(c.steps) > 0 { - return c.steps[c.currstep].lastRWHeader - } - return "2.0;snappy,0.1.0" -} - // TestBlockingWriteClient is a queue_manager WriteClient which will block // on any calls to Store(), until the request's Context is cancelled, at which // point the `numCalls` property will contain a count of how many times Store() @@ -1221,14 +1038,6 @@ func (c *TestBlockingWriteClient) Endpoint() string { return "http://test-remote-blocking.com/1234" } -func (c *TestBlockingWriteClient) probeRemoteVersions(_ context.Context) error { - return nil -} - -func (c *TestBlockingWriteClient) GetLastRWHeader() string { - return "2.0;snappy,0.1.0" -} - // For benchmarking the send and not the receive side. type NopWriteClient struct{} @@ -1238,10 +1047,6 @@ func (c *NopWriteClient) Store(context.Context, []byte, int, config.RemoteWriteF } func (c *NopWriteClient) Name() string { return "nopwriteclient" } func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } -func (c *NopWriteClient) probeRemoteVersions(_ context.Context) error { - return nil -} -func (c *NopWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" } type MockWriteClient struct { StoreFunc func(context.Context, []byte, int) error @@ -1255,14 +1060,6 @@ func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int, _ config. func (c *MockWriteClient) Name() string { return c.NameFunc() } func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() } -// TODO(bwplotka): Mock it if needed. -func (c *MockWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" } - -// TODO(bwplotka): Mock it if needed. -func (c *MockWriteClient) probeRemoteVersions(_ context.Context) error { - return nil -} - // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. var extraLabels []labels.Label = []labels.Label{ {Name: "kubernetes_io_arch", Value: "amd64"}, diff --git a/storage/remote/write.go b/storage/remote/write.go index 3f544456dd..107776be0d 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -191,21 +191,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { delete(rws.queues, hash) continue } - - // Work out what protocol and compression to use for this endpoint. - // Default to Remote Write Version1. + // TODO(alexg): Remote Write version for this endpoint to come from config rwFormat := Version1 - switch rwConf.ProtocolVersion { - case Version1: - // We use the standard value as there's no negotiation to be had. - case Version2: - rwFormat = Version2 - // If this newer remote write format is enabled then we need to probe the remote server - // to work out the desired protocol version and compressions. - // The value of the header is kept in the client so no need to see it here. - _ = c.probeRemoteVersions(context.Background()) - // We ignore any error here, at some point we may choose to log it. - } // Redacted to remove any passwords in the URL (that are // technically accepted but not recommended) since this is diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 1236ccc715..5851c5b18a 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -63,46 +63,6 @@ func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string { return ret } -type writeHeadHandler struct { - logger log.Logger - - remoteWriteHeadRequests prometheus.Counter - - // Experimental feature, new remote write proto format. - // The handler will accept the new format, but it can still accept the old one. - rwFormat config.RemoteWriteFormat -} - -func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat) http.Handler { - h := &writeHeadHandler{ - logger: logger, - rwFormat: rwFormat, - remoteWriteHeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "api", - Name: "remote_write_head_requests", - Help: "The number of remote write HEAD requests.", - }), - } - if reg != nil { - reg.MustRegister(h.remoteWriteHeadRequests) - } - return h -} - -// Send a response to the HEAD request based on the format supported. -func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Add appropriate header values for the specific rwFormat. - for hName, hValue := range rwHeaderNameValues(h.rwFormat) { - w.Header().Set(hName, hValue) - } - - // Increment counter - h.remoteWriteHeadRequests.Inc() - - w.WriteHeader(http.StatusOK) -} - type writeHandler struct { logger log.Logger appendable storage.Appendable @@ -155,8 +115,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { default: // We have a version in the header but it is not one we recognise. level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) - // Return a 406 so that the client can choose a more appropriate protocol to use. - http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) + // Return a 415 so that the client can choose a more appropriate protocol to use. + http.Error(w, "Unknown remote write version in headers", http.StatusUnsupportedMediaType) return } @@ -186,8 +146,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } default: level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding) - // Return a 406 so that the client can choose a more appropriate protocol to use. - http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable) + // Return a 415 so that the client can choose a more appropriate protocol to use. + http.Error(w, "Unsupported Content-Encoding", http.StatusUnsupportedMediaType) return } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 9919f9ee3f..b34f8a5bfc 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -39,25 +39,8 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -func TestRemoteWriteHeadHandler(t *testing.T) { - handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2) - - req, err := http.NewRequest(http.MethodHead, "", nil) - require.NoError(t, err) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusOK, resp.StatusCode) - - // Check header is expected value. - protHeader := resp.Header.Get(RemoteWriteVersionHeader) - require.Equal(t, "2.0;snappy,0.1.0", protHeader) -} - func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) { - // Send a v2 request without a "Content-Encoding:" header -> 406. + // Send a v2 request without a "Content-Encoding:" header -> 415. buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) @@ -74,12 +57,12 @@ func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) { handler.ServeHTTP(recorder, req) resp := recorder.Result() - // Should give us a 406. - require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) + // Should give us a 415. + require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode) } func TestRemoteWriteHandlerInvalidCompression(t *testing.T) { - // Send a v2 request without an unhandled compression scheme -> 406. + // Send a v2 request without an unhandled compression scheme -> 415. buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) @@ -95,12 +78,12 @@ func TestRemoteWriteHandlerInvalidCompression(t *testing.T) { handler.ServeHTTP(recorder, req) resp := recorder.Result() - // Expect a 406. - require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) + // Expect a 415. + require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode) } func TestRemoteWriteHandlerInvalidVersion(t *testing.T) { - // Send a protocol version number that isn't recognised/supported -> 406. + // Send a protocol version number that isn't recognised/supported -> 415. buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) @@ -115,8 +98,8 @@ func TestRemoteWriteHandlerInvalidVersion(t *testing.T) { handler.ServeHTTP(recorder, req) resp := recorder.Result() - // Expect a 406. - require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) + // Expect a 415. + require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode) } func TestRemoteWriteHandler(t *testing.T) {