From 04929dc61044b2b9c6e843f0a505809674213dcb Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Tue, 19 Mar 2024 17:16:20 +0000 Subject: [PATCH 01/10] first draft of content negotiation Signed-off-by: Alex Greenbank --- cmd/promtool/metrics.go | 2 +- storage/remote/client.go | 107 +++++++++++-- storage/remote/client_test.go | 6 +- storage/remote/codec_test.go | 4 +- storage/remote/queue_manager.go | 193 +++++++++++++++++------ storage/remote/queue_manager_test.go | 219 ++++++++++++++++++++++++--- storage/remote/write.go | 20 ++- storage/remote/write_handler.go | 150 +++++++++++++++--- storage/remote/write_handler_test.go | 114 ++++++++++++-- web/api/v1/api.go | 29 +++- web/api/v1/api_test.go | 40 +++++ 11 files changed, 761 insertions(+), 123 deletions(-) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 46246b672a..3117fd29c2 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -116,7 +116,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s // Encode the request body into snappy encoding. compressed := snappy.Encode(nil, raw) - err = client.Store(context.Background(), compressed, 0) + err = client.Store(context.Background(), compressed, 0, remote.Version1, "snappy") if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false diff --git a/storage/remote/client.go b/storage/remote/client.go index 943ba5cd01..09014cea24 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -17,6 +17,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "math/rand" @@ -46,6 +47,9 @@ const maxErrMsgLen = 1024 var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) +var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 +var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 + var ( remoteReadQueriesTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -83,11 +87,12 @@ func init() { // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - remoteName string // Used to differentiate clients in metrics. - urlString string // url.String() - rwFormat config.RemoteWriteFormat // For write clients, ignored for read clients. - Client *http.Client - timeout time.Duration + remoteName string // Used to differentiate clients in metrics. + urlString string // url.String() + rwFormat config.RemoteWriteFormat // For write clients, ignored for read clients. + lastRWHeader string + Client *http.Client + timeout time.Duration retryOnRateLimit bool @@ -199,9 +204,53 @@ type RecoverableError struct { retryAfter model.Duration } +// Attempt a HEAD request against a remote write endpoint to see what it supports. +func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { + // If we are in Version1 mode then don't even bother + if c.rwFormat == Version1 { + return RemoteWriteVersion1HeaderValue, nil + } + + httpReq, err := http.NewRequest("HEAD", c.urlString, nil) + if err != nil { + // Errors from NewRequest are from unparsable URLs, so are not + // recoverable. + return "", err + } + + // Set the version header to be nice + httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + httpReq.Header.Set("User-Agent", UserAgent) + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) + if err != nil { + // We don't attempt a retry here + return "", err + } + + // See if we got a header anyway + promHeader := httpResp.Header.Get(RemoteWriteVersionHeader) + + // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank + if promHeader != "" { + c.lastRWHeader = promHeader + } + + // Check for an error + if httpResp.StatusCode != 200 { + return promHeader, fmt.Errorf(httpResp.Status) + } + + // All ok, return header and no error + return promHeader, nil +} + // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // and encoded bytes from codec.go. -func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { +func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat config.RemoteWriteFormat, compression string) error { httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req)) if err != nil { // Errors from NewRequest are from unparsable URLs, so are not @@ -209,15 +258,15 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { return err } - httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Add("Content-Encoding", compression) httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", UserAgent) - if c.rwFormat == Version1 { + if rwFormat == Version1 { httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue) } else { - // Set the right header if we're using v1.1 remote write protocol - httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion2HeaderValue) + // Set the right header if we're using v2.0 remote write protocol + httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) } if attempt > 0 { @@ -241,7 +290,12 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpResp.Body.Close() }() - // TODO-RW11: Here is where we need to handle version downgrade on error + // See if we got a X-Prometheus-Remote-Write header in the response + if promHeader := httpResp.Header.Get(RemoteWriteVersionHeader); promHeader != "" { + // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank + // (It's blank if it wasn't present, we don't care about that distinction.) + c.lastRWHeader = promHeader + } if httpResp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) @@ -249,7 +303,22 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { if scanner.Scan() { line = scanner.Text() } - err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + switch httpResp.StatusCode { + case 400: + // Return an unrecoverable error to indicate the 400 + // This then gets passed up the chain so we can react to it properly + // TODO(alexg) Do we want to include the first line of the message? + return ErrStatusBadRequest + case 406: + // Return an unrecoverable error to indicate the 406 + // This then gets passed up the chain so we can react to it properly + // TODO(alexg) Do we want to include the first line of the message? + // TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error? + return ErrStatusNotAcceptable + default: + // We want to end up returning a non-specific error + err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + } } if httpResp.StatusCode/100 == 5 || (c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) { @@ -284,6 +353,10 @@ func (c Client) Endpoint() string { return c.urlString } +func (c *Client) GetLastRWHeader() string { + return c.lastRWHeader +} + // Read reads from a remote endpoint. func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { c.readQueries.Inc() @@ -366,7 +439,11 @@ func NewTestClient(name, url string) WriteClient { return &TestClient{name: name, url: url} } -func (c *TestClient) Store(_ context.Context, req []byte, _ int) error { +func (c *TestClient) GetProtoVersions(_ context.Context) (string, error) { + return "2.0;snappy,0.1.0", nil +} + +func (c *TestClient) Store(_ context.Context, req []byte, _ int, _ config.RemoteWriteFormat, _ string) error { r := rand.Intn(200-100) + 100 time.Sleep(time.Duration(r) * time.Millisecond) return nil @@ -379,3 +456,7 @@ func (c *TestClient) Name() string { func (c *TestClient) Endpoint() string { return c.url } + +func (c *TestClient) GetLastRWHeader() string { + return "2.0;snappy,0.1.0" +} diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 2acb8e279a..1711f0fdfa 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { c, err := NewWriteClient(hash, conf) require.NoError(t, err) - err = c.Store(context.Background(), []byte{}, 0) + err = c.Store(context.Background(), []byte{}, 0, Version1, "snappy") if test.err != nil { require.EqualError(t, err, test.err.Error()) } else { @@ -133,7 +133,7 @@ func TestClientRetryAfter(t *testing.T) { c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit)) var recErr RecoverableError - err = c.Store(context.Background(), []byte{}, 0) + err = c.Store(context.Background(), []byte{}, 0, Version1, "snappy") require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.") if tc.expectedRecoverable { require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter) @@ -203,7 +203,7 @@ func TestClientHeaders(t *testing.T) { c, err := NewWriteClient("c", conf) require.NoError(t, err) - err = c.Store(context.Background(), []byte{}, 0) + err = c.Store(context.Background(), []byte{}, 0, Version1, "snappy") require.NoError(t, err) require.True(t, called, "The remote server wasn't called") diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 750fb612c2..608f5cc2b4 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -559,7 +559,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { } func TestDecodeWriteRequest(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) actual, err := DecodeWriteRequest(bytes.NewReader(buf)) @@ -568,7 +568,7 @@ func TestDecodeWriteRequest(t *testing.T) { } func TestDecodeMinWriteRequest(t *testing.T) { - buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil) + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) actual, err := DecodeMinimizedWriteRequestStr(bytes.NewReader(buf)) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 72cbb7cad6..c5b2655835 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -16,8 +16,10 @@ package remote import ( "context" "errors" + "fmt" "math" "strconv" + "strings" "sync" "time" @@ -387,11 +389,15 @@ func (m *queueManagerMetrics) unregister() { // external timeseries database. type WriteClient interface { // Store stores the given samples in the remote storage. - Store(ctx context.Context, req []byte, retryAttempt int) error + Store(ctx context.Context, req []byte, retryAttempt int, rwFormat config.RemoteWriteFormat, compression string) error // Name uniquely identifies the remote storage. Name() string // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string + // Get the protocol versions supported by the endpoint + GetProtoVersions(ctx context.Context) (string, error) + // Get the last RW header received from the endpoint + GetLastRWHeader() string } const ( @@ -416,8 +422,7 @@ type QueueManager struct { sendNativeHistograms bool watcher *wlog.Watcher metadataWatcher *MetadataWatcher - // experimental feature, new remote write proto format - rwFormat config.RemoteWriteFormat + rwFormat config.RemoteWriteFormat clientMtx sync.RWMutex storeClient WriteClient @@ -490,9 +495,7 @@ func NewQueueManager( storeClient: client, sendExemplars: enableExemplarRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite, - // TODO: we should eventually set the format via content negotiation, - // so this field would be the desired format, maybe with a fallback? - rwFormat: rwFormat, + rwFormat: rwFormat, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata), @@ -572,7 +575,11 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil) + + // Get compression to use from content negotiation based on last header seen (defaults to snappy) + compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader()) + + req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression) if err != nil { return err } @@ -595,7 +602,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p } begin := time.Now() - err := t.storeClient.Store(ctx, req, try) + err := t.storeClient.Store(ctx, req, try, Version1, compression) t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { @@ -1491,6 +1498,41 @@ func (q *queue) newBatch(capacity int) []timeSeries { return make([]timeSeries, 0, capacity) } +func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) { + if rwFormat == Version1 { + // If we're only handling Version1 then all we can do is that with snappy compression + return "snappy", Version1 + } + if rwFormat != Version2 { + // If we get here then someone has added a new RemoteWriteFormat value but hasn't + // fixed this function to handle it + // panic! + panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat)) + } + if lastHeaderSeen == "" { + // We haven't had a valid header, so we just default to 0.1.0/snappy + return "snappy", Version1 + } + // We can currently handle: + // "2.0;snappy" + // "0.1.0" - implicit compression of snappy + // lastHeaderSeen should contain a list of tuples + // If we find a match to something we can handle then we can return that + for _, tuple := range strings.Split(lastHeaderSeen, ",") { + // Remove spaces from the tuple + curr := strings.ReplaceAll(tuple, " ", "") + switch curr { + case "2.0;snappy": + return "snappy", Version2 + case "0.1.0": + return "snappy", Version1 + } + } + + // Otherwise we have to default to "0.1.0" + return "snappy", Version1 +} + func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { defer func() { if s.running.Dec() == 0 { @@ -1541,6 +1583,26 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } defer stop() + attemptBatchSend := func(batch []timeSeries, rwFormat config.RemoteWriteFormat, compression string, timer bool) error { + switch rwFormat { + case Version1: + nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + if timer { + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, + "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) + } + return s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf, compression) + case Version2: + nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + err := s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf, compression) + symbolTable.clear() + return err + } + return nil + } + for { select { case <-ctx.Done(): @@ -1564,16 +1626,20 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - switch s.qm.rwFormat { - case Version1: - nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) - case Version2: - nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf) - symbolTable.clear() + // Resend logic on 406 + // ErrStatusNotAcceptable is a new error defined in client.go + + // Work out what version to send based on the last header seen and the QM's rwFormat setting + // TODO(alexg) - see comments below about retry/renegotiate design + for attemptNos := 1; attemptNos <= 3; attemptNos++ { + lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() + compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) + sendErr := attemptBatchSend(batch, rwFormat, compression, false) + if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { + // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying + break + } + // If we get either of the two errors (406, 400) we loop and re-negotiate } queue.ReturnForReuse(batch) @@ -1584,19 +1650,23 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - switch s.qm.rwFormat { - case Version1: - nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) - level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, - "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) - case Version2: - nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) - n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf) - symbolTable.clear() + for attemptNos := 1; attemptNos <= 3; attemptNos++ { + // Work out what version to send based on the last header seen and the QM's rwFormat setting + lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() + compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) + sendErr := attemptBatchSend(batch, rwFormat, compression, true) + if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { + // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying + break + } + // If we get either of the two errors (406, 400) we loop and re-negotiate } + // TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we + // Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format + // Q: Do we always try downgrading to 1.0 at least once? + // Q: Do we limit our attempts to only try a particular protocol/encoding tuple once? + // Q: Is 3 a suitable limit? + // TODO(alexg) - add retry/renegotiate metrics here } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1645,16 +1715,22 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, compression string) error { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf) + err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin)) + + // Return the error in case it is a 406 and we need to reformat the data + return err } -func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) { +func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, compression string) error { begin := time.Now() - err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf) + err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin)) + + // Return the error in case it is a 406 and we need to reformat the data + return err } func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) { @@ -1681,9 +1757,9 @@ func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exem } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, compression string) error { // Build the WriteRequest with no metadata. - req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil) + req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, compression) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will @@ -1709,6 +1785,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti pBuf, buf, isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), + compression, ) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { @@ -1740,7 +1817,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.metadataTotal.Add(float64(metadataCount)) - err := s.qm.client().Store(ctx, *buf, try) + err := s.qm.client().Store(ctx, *buf, try, Version1, compression) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { @@ -1771,9 +1848,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } // sendV2Samples to the remote storage with backoff for recoverable errors. -func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) error { +func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, compression string) error { // Build the WriteRequest with no metadata. - req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil) + req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, compression) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will @@ -1799,6 +1876,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 pBuf, buf, isV2TimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), + compression, ) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { @@ -1830,7 +1908,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.metadataTotal.Add(float64(metadataCount)) - err := s.qm.client().Store(ctx, *buf, try) + err := s.qm.client().Store(ctx, *buf, try, Version2, compression) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { @@ -2017,7 +2095,7 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms } -func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) { +func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, compression string) ([]byte, int64, int64, error) { highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) @@ -2047,10 +2125,18 @@ func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metada } else { buf = &[]byte{} } - compressed := snappy.Encode(*buf, pBuf.Bytes()) - if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) + + var compressed []byte + + switch compression { + case "snappy": + compressed = snappy.Encode(*buf, pBuf.Bytes()) + if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) { + // grow the buffer for the next time + *buf = make([]byte, n) + } + default: + return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression) } return compressed, highest, lowest, nil } @@ -2087,7 +2173,7 @@ func (r *rwSymbolTable) clear() { } } -func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool) ([]byte, int64, int64, error) { +func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool, compression string) ([]byte, int64, int64, error) { highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter) if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { @@ -2117,10 +2203,17 @@ func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels buf = &[]byte{} } - compressed := snappy.Encode(*buf, data) - if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) + var compressed []byte + + switch compression { + case "snappy": + compressed = snappy.Encode(*buf, data) + if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) { + // grow the buffer for the next time + *buf = make([]byte, n) + } + default: + return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression) } return compressed, highest, lowest, nil diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index dcf4a8302a..7fac789166 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -63,6 +63,136 @@ func newHighestTimestampMetric() *maxTimestamp { } } +type contentNegotiationStep struct { + lastRWHeader string + compression string + behaviour error // or nil + attemptString string +} + +func TestContentNegotiation(t *testing.T) { + testcases := []struct { + name string + success bool + qmRwFormat config.RemoteWriteFormat + rwFormat config.RemoteWriteFormat + steps []contentNegotiationStep + }{ + // Test a simple case where the v2 request we send is processed first time + {success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, + }, + }, + // Test a simple case where the v1 request we send is processed first time + {success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, + }, + }, + // Test a case where the v1 request has a temporary delay but goes through on retry + // There is no content re-negotiation between first and retry attempts + {success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, + }, + }, + // Repeat the above test but with v2. The request has a temporary delay but goes through on retry + // There is no content re-negotiation between first and retry attempts + {success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, + }, + }, + // Now test where the server suddenly stops speaking 2.0 and we need to downgrade + {success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, + }, + }, + // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400 + {success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, + }, + }, + // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only + {success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries) + }, + }, + } + + queueConfig := config.DefaultQueueConfig + queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) + queueConfig.MaxShards = 1 + + // We need to set URL's so that metric creation doesn't panic. + writeConfig := baseRemoteWriteConfig("http://test-storage.com") + writeConfig.QueueConfig = queueConfig + writeConfig.SendExemplars = true // ALEXG - need? + writeConfig.SendNativeHistograms = true // ALEXG - need? + + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + writeConfig, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true) + defer s.Close() + + var ( + series []record.RefSeries + metadata []record.RefMetadata + samples []record.RefSample + ) + + // Generates same series in both cases. + samples, series = createTimeseries(1, 1) + metadata = createSeriesMetadata(series) + + // Apply new config. + queueConfig.Capacity = len(samples) + queueConfig.MaxSamplesPerSend = len(samples) + // For now we only ever have a single rw config in this test. + conf.RemoteWriteConfigs[0].ProtocolVersion = tc.qmRwFormat + require.NoError(t, s.ApplyConfig(conf)) + hash, err := toHash(writeConfig) + require.NoError(t, err) + qm := s.rws.queues[hash] + + c := NewTestWriteClient(tc.rwFormat) + c.setSteps(tc.steps) // set expected behaviour + qm.SetClient(c) + + qm.StoreSeries(series, 0) + qm.StoreMetadata(metadata) + + // Did we expect some data back? + if tc.success { + c.expectSamples(samples, series) + } + qm.Append(samples) + if !tc.success { + // We just need to sleep for a bit to give it time to run + time.Sleep(2 * time.Second) + } else { + c.waitForExpectedData(t, 2*time.Second) + } + require.Equal(t, len(c.sendAttempts), len(tc.steps)) + for i, s := range c.sendAttempts { + require.Equal(t, s, tc.steps[i].attemptString) + } + }) + } +} + func TestSampleDelivery(t *testing.T) { testcases := []struct { name string @@ -78,7 +208,7 @@ func TestSampleDelivery(t *testing.T) { {samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, {samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, - // TODO: update some portion of this test to check for the 2.0 metadata + // TODO(alexg): update some portion of this test to check for the 2.0 metadata {samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only", rwFormat: Version2}, {samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms", rwFormat: Version2}, {samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only", rwFormat: Version2}, @@ -140,8 +270,6 @@ func TestSampleDelivery(t *testing.T) { // Apply new config. queueConfig.Capacity = len(samples) queueConfig.MaxSamplesPerSend = len(samples) / 2 - // For now we only ever have a single rw config in this test. - conf.RemoteWriteConfigs[0].ProtocolVersion = tc.rwFormat require.NoError(t, s.ApplyConfig(conf)) hash, err := toHash(writeConfig) require.NoError(t, err) @@ -233,7 +361,7 @@ func (c *perRequestWriteClient) expectedData(t testing.TB) { } } -func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) error { +func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int, rwFormat config.RemoteWriteFormat, compression string) error { c.mtx.Lock() defer c.mtx.Unlock() defer func() { c.i++ }() @@ -241,7 +369,7 @@ func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) er return nil } - if err := c.TestWriteClient.Store(ctx, req, r); err != nil { + if err := c.TestWriteClient.Store(ctx, req, r, rwFormat, compression); err != nil { return err } @@ -271,7 +399,7 @@ func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) er } c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries) c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...) - return c.requests[c.i].Store(ctx, req, r) + return c.requests[c.i].Store(ctx, req, r, rwFormat, compression) } func testDefaultQueueConfig() config.QueueConfig { @@ -362,7 +490,7 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, 0) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1) m.Start() defer m.Stop() @@ -970,6 +1098,10 @@ type TestWriteClient struct { mtx sync.Mutex buf []byte rwFormat config.RemoteWriteFormat + sendAttempts []string + steps []contentNegotiationStep + currstep int + retry bool } func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient { @@ -981,6 +1113,12 @@ func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient { } } +func (c *TestWriteClient) setSteps(steps []contentNegotiationStep) { + c.steps = steps + c.currstep = -1 // incremented by GetLastRWHeader() + c.retry = false +} + func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { c.mtx.Lock() defer c.mtx.Unlock() @@ -1087,7 +1225,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Durati } } -func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { +func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, rwFormat config.RemoteWriteFormat, compression string) error { c.mtx.Lock() defer c.mtx.Unlock() // nil buffers are ok for snappy, ignore cast error. @@ -1100,8 +1238,23 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { return err } + attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression) + + if attemptNos > 0 { + // If this is a second attempt then we need to bump to the next step otherwise we loop + c.currstep++ + } + + // Check if we've been told to return something for this config + if len(c.steps) > 0 { + if err = c.steps[c.currstep].behaviour; err != nil { + c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err)) + return err + } + } + var reqProto *prompb.WriteRequest - switch c.rwFormat { + switch rwFormat { case Version1: reqProto = &prompb.WriteRequest{} err = proto.Unmarshal(reqBuf, reqProto) @@ -1114,6 +1267,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { } if err != nil { + c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err)) return err } @@ -1139,6 +1293,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { } c.writesReceived++ + c.sendAttempts = append(c.sendAttempts, attemptString+",ok") return nil } @@ -1150,6 +1305,18 @@ func (c *TestWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +func (c *TestWriteClient) GetProtoVersions(_ context.Context) (string, error) { + return "2.0;snappy,0.1.0", nil +} + +func (c *TestWriteClient) GetLastRWHeader() string { + c.currstep++ + if len(c.steps) > 0 { + return c.steps[c.currstep].lastRWHeader + } + return "2.0;snappy,0.1.0" +} + // TestBlockingWriteClient is a queue_manager WriteClient which will block // on any calls to Store(), until the request's Context is cancelled, at which // point the `numCalls` property will contain a count of how many times Store() @@ -1162,7 +1329,7 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient { return &TestBlockingWriteClient{} } -func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error { +func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int, _ config.RemoteWriteFormat, _ string) error { c.numCalls.Inc() <-ctx.Done() return nil @@ -1180,13 +1347,27 @@ func (c *TestBlockingWriteClient) Endpoint() string { return "http://test-remote-blocking.com/1234" } +func (c *TestBlockingWriteClient) GetProtoVersions(_ context.Context) (string, error) { + return "2.0;snappy,0.1.0", nil +} + +func (c *TestBlockingWriteClient) GetLastRWHeader() string { + return "2.0;snappy,0.1.0" +} + // For benchmarking the send and not the receive side. type NopWriteClient struct{} -func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } -func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil } -func (c *NopWriteClient) Name() string { return "nopwriteclient" } -func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } +func (c *NopWriteClient) Store(context.Context, []byte, int, config.RemoteWriteFormat, string) error { + return nil +} +func (c *NopWriteClient) Name() string { return "nopwriteclient" } +func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +func (c *NopWriteClient) GetProtoVersions(_ context.Context) (string, error) { + return "2.0;snappy,0.1.0", nil +} +func (c *NopWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" } // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. var extraLabels []labels.Label = []labels.Label{ @@ -1777,14 +1958,14 @@ func BenchmarkBuildWriteRequest(b *testing.B) { // Warmup buffers for i := 0; i < 10; i++ { populateTimeSeries(batch, seriesBuff, true, true) - buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil) + buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy") } b.ResetTimer() totalSize := 0 for i := 0; i < b.N; i++ { populateTimeSeries(batch, seriesBuff, true, true) - req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil) + req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy") if err != nil { b.Fatal(err) } @@ -1833,7 +2014,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { // Warmup buffers for i := 0; i < 10; i++ { populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) - buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil) + buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil, "snappy") } b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { @@ -1841,7 +2022,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { for j := 0; j < b.N; j++ { populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) b.ResetTimer() - req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil) + req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil, "snappy") if err != nil { b.Fatal(err) } @@ -1859,7 +2040,7 @@ func TestDropOldTimeSeries(t *testing.T) { nSamples := config.DefaultQueueConfig.Capacity * size samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries) - // TODO: test with new version + // TODO(alexg): test with new version c := NewTestWriteClient(Version1) c.expectSamples(newSamples, series) diff --git a/storage/remote/write.go b/storage/remote/write.go index 8d584530f4..88a71cce45 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -192,6 +192,24 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { continue } + // Work out what protocol and compression to use for this endpoint + // Default to Remote Write Version1 + rwFormat := Version1 + switch rwConf.ProtocolVersion { + case Version1: + // We use the standard value as there's no negotiation to be had + case Version2: + // If this newer remote write format is enabled then we need to probe the remote server + // to work out the desired protocol version and compressions + // The value of the header is kept in the client so no need to see it here + rwFormat = Version2 + _, err := c.GetProtoVersions(context.Background()) // TODO(alexg) - better ctx to pass? + if err != nil { + // TODO(alexg) - Log an error based on this? + // TODO(alexg) - if we get 405 (MethodNotAllowed) then we should default to 1.0 (and downgrade rwFormat)? + } + } + // Redacted to remove any passwords in the URL (that are // technically accepted but not recommended) since this is // only used for metric labels. @@ -214,7 +232,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, - rwConf.ProtocolVersion, + rwFormat, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index fe7afbfa3e..7ace752e55 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -17,11 +17,16 @@ import ( "context" "errors" "fmt" + "io" "net/http" + "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" @@ -34,11 +39,68 @@ import ( ) const ( - RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" - RemoteWriteVersion1HeaderValue = "0.1.0" - RemoteWriteVersion2HeaderValue = "2.0" + RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" + RemoteWriteVersion1HeaderValue = "0.1.0" + RemoteWriteVersion20HeaderValue = "2.0" ) +func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string { + // Return the correct remote write header name/values based on provided rwFormat + ret := make(map[string]string, 1) + + switch rwFormat { + case Version1: + ret[RemoteWriteVersionHeader] = RemoteWriteVersion1HeaderValue + case Version2: + // We need to add the supported protocol definitions in order: + tuples := make([]string, 0, 2) + // Add 2.0;snappy + tuples = append(tuples, RemoteWriteVersion20HeaderValue+";snappy") + // Add default 0.1.0 + tuples = append(tuples, RemoteWriteVersion1HeaderValue) + ret[RemoteWriteVersionHeader] = strings.Join(tuples, ",") + } + return ret +} + +type writeHeadHandler struct { + logger log.Logger + + remoteWrite20HeadRequests prometheus.Counter + + // Experimental feature, new remote write proto format + // The handler will accept the new format, but it can still accept the old one + rwFormat config.RemoteWriteFormat +} + +func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat) http.Handler { + h := &writeHeadHandler{ + logger: logger, + rwFormat: rwFormat, + remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "api", + Name: "remote_write_20_head_requests", + Help: "The number of remote write 2.0 head requests.", + }), + } + if reg != nil { + reg.MustRegister(h.remoteWrite20HeadRequests) + } + return h +} + +func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Send a response to the HEAD request based on the format supported + + // Add appropriate header values for the specific rwFormat + for hName, hValue := range rwHeaderNameValues(h.rwFormat) { + w.Header().Set(hName, hValue) + } + + w.WriteHeader(http.StatusOK) +} + type writeHandler struct { logger log.Logger appendable storage.Appendable @@ -47,7 +109,6 @@ type writeHandler struct { // Experimental feature, new remote write proto format // The handler will accept the new format, but it can still accept the old one - // TODO: this should eventually be via content negotiation? rwFormat config.RemoteWriteFormat } @@ -73,29 +134,82 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var err error - var req *prompb.WriteRequest - var reqMinStr *writev2.WriteRequest - // TODO: this should eventually be done via content negotiation/looking at the header - switch h.rwFormat { - case Version1: - req, err = DecodeWriteRequest(r.Body) - case Version2: - reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body) + // Set the header(s) in the response based on the rwFormat the server supports + for hName, hValue := range rwHeaderNameValues(h.rwFormat) { + w.Header().Set(hName, hValue) } + // Parse the headers to work out how to handle this + contentEncoding := r.Header.Get("Content-Encoding") + protoVer := r.Header.Get(RemoteWriteVersionHeader) + + switch protoVer { + case "": + // No header provided, assume 0.1.0 as everything that relies on later + protoVer = RemoteWriteVersion1HeaderValue + case RemoteWriteVersion1HeaderValue, RemoteWriteVersion20HeaderValue: + // We know this header, woo + default: + // We have a version in the header but it is not one we recognise + // TODO(alexg) - make a proper error for this? + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) + // Return a 406 so that the client can choose a more appropriate protocol to use + http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) + return + } + + // Deal with 0.1.0 clients that forget to send Content-Encoding + if protoVer == RemoteWriteVersion1HeaderValue && contentEncoding == "" { + contentEncoding = "snappy" + } + + // Read the request body + body, err := io.ReadAll(r.Body) if err != nil { level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } - // TODO: this should eventually be done detecting the format version above - switch h.rwFormat { - case Version1: - err = h.write(r.Context(), req) - case Version2: - err = h.writeMinStr(r.Context(), reqMinStr) + // Deal with contentEncoding first + var decompressed []byte + + switch contentEncoding { + case "snappy": + decompressed, err = snappy.Decode(nil, body) + if err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + default: + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding) + // Return a 406 so that the client can choose a more appropriate protocol to use + http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable) + return + } + + // Now we have a decompressed buffer we can unmarshal it + // At this point we are happy with the version but need to check the encoding + switch protoVer { + case RemoteWriteVersion1HeaderValue: + var req prompb.WriteRequest + if err := proto.Unmarshal(decompressed, &req); err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = h.write(r.Context(), &req) + case RemoteWriteVersion20HeaderValue: + // 2.0 request + var reqMinStr writev2.WriteRequest + if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = h.writeMinStr(r.Context(), &reqMinStr) } switch { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 621f8aa9f3..c230117976 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -39,15 +39,94 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) +func TestRemoteWriteHeadHandler(t *testing.T) { + handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2) + + req, err := http.NewRequest(http.MethodHead, "", nil) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusOK, resp.StatusCode) + + // Check header is expected value + protHeader := resp.Header.Get(RemoteWriteVersionHeader) + require.Equal(t, "2.0;snappy,0.1.0", protHeader) +} + +func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) { + // Send a v2 request without a "Content-Encoding:" header -> 406 + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + // Do not provide "Content-Encoding: snappy" header + // req.Header.Set("Content-Encoding", "snappy") + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + // Should give us a 406 + require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) +} + +func TestRemoteWriteHandlerInvalidCompression(t *testing.T) { + // Send a v2 request without an unhandled compression scheme -> 406 + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + req.Header.Set("Content-Encoding", "zstd") + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + // Expect a 406 + require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) +} + +func TestRemoteWriteHandlerInvalidVersion(t *testing.T) { + // Send a protocol version number that isn't recognised/supported -> 406 + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + req.Header.Set(RemoteWriteVersionHeader, "3.0") + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + // Expect a 406 + require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) +} + func TestRemoteWriteHandler(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) require.NoError(t, err) appendable := &mockAppendable{} - // TODO: test with other proto format(s) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1) recorder := httptest.NewRecorder() @@ -56,6 +135,10 @@ func TestRemoteWriteHandler(t *testing.T) { resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) + // Check header is expected value + protHeader := resp.Header.Get(RemoteWriteVersionHeader) + require.Equal(t, "0.1.0", protHeader) + i := 0 j := 0 k := 0 @@ -87,16 +170,17 @@ func TestRemoteWriteHandler(t *testing.T) { } func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { - buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil) + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion2HeaderValue) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + // Must provide "Content-Encoding: snappy" header + req.Header.Set("Content-Encoding", "snappy") require.NoError(t, err) appendable := &mockAppendable{} - // TODO: test with other proto format(s) - handler := NewWriteHandler(nil, nil, appendable, Version2) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -104,6 +188,10 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) + // Check header is expected value + protHeader := resp.Header.Get(RemoteWriteVersionHeader) + require.Equal(t, "2.0;snappy,0.1.0", protHeader) + i := 0 j := 0 k := 0 @@ -142,7 +230,7 @@ func TestOutOfOrderSample(t *testing.T) { buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil, nil, nil) + }}, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -168,7 +256,7 @@ func TestOutOfOrderExemplar(t *testing.T) { buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil, nil, nil) + }}, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -192,7 +280,7 @@ func TestOutOfOrderHistogram(t *testing.T) { buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, - }}, nil, nil, nil, nil) + }}, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -222,7 +310,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { {Name: "test_label_name_" + num, Value: labelValue + num}, }, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, - }}, nil, nil, nil, nil) + }}, nil, nil, nil, nil, "snappy") require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) require.NoError(b, err) @@ -241,7 +329,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { } func TestCommitErr(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -280,7 +368,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { // TODO: test with other proto format(s) handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Version1) - buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -293,7 +381,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { var bufRequests [][]byte for i := 0; i < 100; i++ { - buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil) + buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) bufRequests = append(bufRequests, buf) } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 4d71ed93d0..203ed6d147 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -217,9 +217,10 @@ type API struct { isAgent bool statsRenderer StatsRenderer - remoteWriteHandler http.Handler - remoteReadHandler http.Handler - otlpWriteHandler http.Handler + remoteWriteHeadHandler http.Handler + remoteWriteHandler http.Handler + remoteReadHandler http.Handler + otlpWriteHandler http.Handler codecs []Codec } @@ -296,7 +297,20 @@ func NewAPI( } if rwEnabled { + // TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising + // For rollout we do two phases: + // 0. (Before) no flags set + // 1. (During) support new protocols but don't advertise + // + // 2. (After) support new protocols and advertise + // + // For rollback the two phases are: + // 0. (Before) support new protocols and advertise + // 1. (During) support new protocols but don't advertise + // + // 2. (After) no flags set a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat) + a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, rwFormat) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) @@ -393,6 +407,7 @@ func (api *API) Register(r *route.Router) { r.Get("/status/walreplay", api.serveWALReplayStatus) r.Post("/read", api.ready(api.remoteRead)) r.Post("/write", api.ready(api.remoteWrite)) + r.Head("/write", api.remoteWriteHead) r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite)) r.Get("/alerts", wrapAgent(api.alerts)) @@ -1616,6 +1631,14 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } +func (api *API) remoteWriteHead(w http.ResponseWriter, r *http.Request) { + if api.remoteWriteHeadHandler != nil { + api.remoteWriteHeadHandler.ServeHTTP(w, r) + } else { + http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound) + } +} + func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { if api.remoteWriteHandler != nil { api.remoteWriteHandler.ServeHTTP(w, r) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 5a0f8a7aba..44f1805610 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -337,6 +337,46 @@ var sampleFlagMap = map[string]string{ "flag2": "value2", } +func TestHeadEndpoint(t *testing.T) { + for _, tc := range []struct { + name string + rwFormat config.RemoteWriteFormat + expectedStatusCode int + expectedHeaderValue string + }{ + { + name: "HEAD Version 1", + rwFormat: remote.Version1, + expectedStatusCode: http.StatusOK, + expectedHeaderValue: "0.1.0", + }, + { + name: "HEAD Version 2", + rwFormat: remote.Version2, + expectedStatusCode: http.StatusOK, + expectedHeaderValue: "2.0;snappy,0.1.0", + }, + } { + r := route.New() + api := &API{remoteWriteHeadHandler: remote.NewWriteHeadHandler(log.NewNopLogger(), nil, tc.rwFormat), + ready: func(f http.HandlerFunc) http.HandlerFunc { return f }} + api.Register(r) + + s := httptest.NewServer(r) + defer s.Close() + + req, err := http.NewRequest("HEAD", s.URL+"/write", nil) + require.NoError(t, err, "Error creating HEAD request") + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err, "Error executing HEAD request") + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + promHeader := resp.Header.Get(remote.RemoteWriteVersionHeader) + require.Equal(t, tc.expectedHeaderValue, promHeader) + } +} + func TestEndpoints(t *testing.T) { storage := promql.LoadedStorage(t, ` load 1m From 766d947ba5ba854a2eb90251426cf053e409326f Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Tue, 19 Mar 2024 18:06:48 +0000 Subject: [PATCH 02/10] Lint Signed-off-by: Alex Greenbank --- storage/remote/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 09014cea24..cb3ba8cec0 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -47,7 +47,7 @@ const maxErrMsgLen = 1024 var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) -var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 +var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 var ( From b884b15f39053f355135bd93381c913c2d73a6bc Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Tue, 19 Mar 2024 18:28:20 +0000 Subject: [PATCH 03/10] Fix race in test Signed-off-by: Alex Greenbank --- storage/remote/queue_manager_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7fac789166..c76143c848 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -179,12 +179,17 @@ func TestContentNegotiation(t *testing.T) { c.expectSamples(samples, series) } qm.Append(samples) + if !tc.success { // We just need to sleep for a bit to give it time to run time.Sleep(2 * time.Second) + // But we still need to check for data with no delay to avoid race + c.waitForExpectedData(t, 0*time.Second) } else { - c.waitForExpectedData(t, 2*time.Second) + // We expected data so wait for it + c.waitForExpectedData(t, 5*time.Second) } + require.Equal(t, len(c.sendAttempts), len(tc.steps)) for i, s := range c.sendAttempts { require.Equal(t, s, tc.steps[i].attemptString) From ca20eb9f321639574aa1de2f0b070dd6bf5a4bf4 Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Tue, 19 Mar 2024 19:03:22 +0000 Subject: [PATCH 04/10] Fix another test race Signed-off-by: Alex Greenbank --- storage/remote/queue_manager_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c76143c848..a707e806ad 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1315,6 +1315,8 @@ func (c *TestWriteClient) GetProtoVersions(_ context.Context) (string, error) { } func (c *TestWriteClient) GetLastRWHeader() string { + c.mtx.Lock() + defer c.mtx.Unlock() c.currstep++ if len(c.steps) > 0 { return c.steps[c.currstep].lastRWHeader From e5c5ef1a2d80616fe2798e164ad12fca925191da Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Wed, 20 Mar 2024 16:14:14 +0000 Subject: [PATCH 05/10] Almost done with lint Signed-off-by: Alex Greenbank --- storage/remote/client.go | 3 +- storage/remote/queue_manager_test.go | 63 +++++++++++++++------------- web/api/v1/api_test.go | 6 ++- 3 files changed, 41 insertions(+), 31 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index cb3ba8cec0..32f58af5b2 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -47,7 +47,8 @@ const maxErrMsgLen = 1024 var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) -var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 +var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 + var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 var ( diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a707e806ad..b0aaed4a64 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -79,48 +79,55 @@ func TestContentNegotiation(t *testing.T) { steps []contentNegotiationStep }{ // Test a simple case where the v2 request we send is processed first time - {success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, - }, + { + success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, + }, }, // Test a simple case where the v1 request we send is processed first time - {success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, - }, + { + success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, + }, }, // Test a case where the v1 request has a temporary delay but goes through on retry // There is no content re-negotiation between first and retry attempts - {success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, - }, + { + success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, + }, }, // Repeat the above test but with v2. The request has a temporary delay but goes through on retry // There is no content re-negotiation between first and retry attempts - {success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, - }, + { + success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, + }, }, // Now test where the server suddenly stops speaking 2.0 and we need to downgrade - {success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, - }, + { + success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, + }, }, // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400 - {success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, - }, + { + success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, + }, }, // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only - {success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ - {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, - {lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, - {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, - // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries) - }, + { + success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ + {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, + {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, + // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries) + }, }, } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 44f1805610..a5e9fc09ef 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -358,8 +358,10 @@ func TestHeadEndpoint(t *testing.T) { }, } { r := route.New() - api := &API{remoteWriteHeadHandler: remote.NewWriteHeadHandler(log.NewNopLogger(), nil, tc.rwFormat), - ready: func(f http.HandlerFunc) http.HandlerFunc { return f }} + api := &API{ + remoteWriteHeadHandler: remote.NewWriteHeadHandler(log.NewNopLogger(), nil, tc.rwFormat), + ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, + } api.Register(r) s := httptest.NewServer(r) From 95f1ee61cf31ecefca588a06ada3ff4d71f87b6f Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Thu, 21 Mar 2024 15:41:39 +0000 Subject: [PATCH 06/10] Fix todos around 405 HEAD handling Signed-off-by: Alex Greenbank --- storage/remote/client.go | 8 ++++++++ storage/remote/write.go | 9 +++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 32f58af5b2..d7838cb72b 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -242,6 +242,14 @@ func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { // Check for an error if httpResp.StatusCode != 200 { + if httpResp.StatusCode == 405 { + // If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't + // understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten + // even if it is blank + // This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives + // a response that confirms it can speak 2.0 + c.lastRWHeader = promHeader + } return promHeader, fmt.Errorf(httpResp.Status) } diff --git a/storage/remote/write.go b/storage/remote/write.go index 88a71cce45..0b79cfa2c2 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -199,15 +199,12 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { case Version1: // We use the standard value as there's no negotiation to be had case Version2: + rwFormat = Version2 // If this newer remote write format is enabled then we need to probe the remote server // to work out the desired protocol version and compressions // The value of the header is kept in the client so no need to see it here - rwFormat = Version2 - _, err := c.GetProtoVersions(context.Background()) // TODO(alexg) - better ctx to pass? - if err != nil { - // TODO(alexg) - Log an error based on this? - // TODO(alexg) - if we get 405 (MethodNotAllowed) then we should default to 1.0 (and downgrade rwFormat)? - } + _, _ = c.GetProtoVersions(context.Background()) + // TODO(alexg): Since they're never used should I remove the return values of GetProtoVersion()? } // Redacted to remove any passwords in the URL (that are From 7b40203302e0e1e2326730a367db389080c645cf Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Tue, 2 Apr 2024 14:51:39 +0000 Subject: [PATCH 07/10] Changes based on review comments Signed-off-by: Alex Greenbank --- storage/remote/client.go | 27 +++++++++++---------------- storage/remote/queue_manager.go | 2 +- storage/remote/queue_manager_test.go | 12 ++++++------ storage/remote/write.go | 4 ++-- 4 files changed, 20 insertions(+), 25 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index d7838cb72b..7ff17a5a1f 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -88,9 +88,8 @@ func init() { // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - remoteName string // Used to differentiate clients in metrics. - urlString string // url.String() - rwFormat config.RemoteWriteFormat // For write clients, ignored for read clients. + remoteName string // Used to differentiate clients in metrics. + urlString string // url.String() lastRWHeader string Client *http.Client timeout time.Duration @@ -173,7 +172,6 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { httpClient.Transport = otelhttp.NewTransport(t) return &Client{ - rwFormat: conf.RemoteWriteFormat, remoteName: name, urlString: conf.URL.String(), Client: httpClient, @@ -206,17 +204,14 @@ type RecoverableError struct { } // Attempt a HEAD request against a remote write endpoint to see what it supports. -func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { - // If we are in Version1 mode then don't even bother - if c.rwFormat == Version1 { - return RemoteWriteVersion1HeaderValue, nil - } +func (c *Client) probeRemoteVersions(ctx context.Context) error { + // We assume we are in Version2 mode otherwise we shouldn't be calling this httpReq, err := http.NewRequest("HEAD", c.urlString, nil) if err != nil { // Errors from NewRequest are from unparsable URLs, so are not // recoverable. - return "", err + return err } // Set the version header to be nice @@ -229,7 +224,7 @@ func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) if err != nil { // We don't attempt a retry here - return "", err + return err } // See if we got a header anyway @@ -250,11 +245,11 @@ func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { // a response that confirms it can speak 2.0 c.lastRWHeader = promHeader } - return promHeader, fmt.Errorf(httpResp.Status) + return fmt.Errorf(httpResp.Status) } - // All ok, return header and no error - return promHeader, nil + // All ok, return no error + return nil } // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled @@ -448,8 +443,8 @@ func NewTestClient(name, url string) WriteClient { return &TestClient{name: name, url: url} } -func (c *TestClient) GetProtoVersions(_ context.Context) (string, error) { - return "2.0;snappy,0.1.0", nil +func (c *TestClient) probeRemoteVersions(_ context.Context) error { + return nil } func (c *TestClient) Store(_ context.Context, req []byte, _ int, _ config.RemoteWriteFormat, _ string) error { diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index c5b2655835..7a3234de6c 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -395,7 +395,7 @@ type WriteClient interface { // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string // Get the protocol versions supported by the endpoint - GetProtoVersions(ctx context.Context) (string, error) + probeRemoteVersions(ctx context.Context) error // Get the last RW header received from the endpoint GetLastRWHeader() string } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index b0aaed4a64..a570d433d4 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1317,8 +1317,8 @@ func (c *TestWriteClient) Endpoint() string { return "http://test-remote.com/1234" } -func (c *TestWriteClient) GetProtoVersions(_ context.Context) (string, error) { - return "2.0;snappy,0.1.0", nil +func (c *TestWriteClient) probeRemoteVersions(_ context.Context) error { + return nil } func (c *TestWriteClient) GetLastRWHeader() string { @@ -1361,8 +1361,8 @@ func (c *TestBlockingWriteClient) Endpoint() string { return "http://test-remote-blocking.com/1234" } -func (c *TestBlockingWriteClient) GetProtoVersions(_ context.Context) (string, error) { - return "2.0;snappy,0.1.0", nil +func (c *TestBlockingWriteClient) probeRemoteVersions(_ context.Context) error { + return nil } func (c *TestBlockingWriteClient) GetLastRWHeader() string { @@ -1378,8 +1378,8 @@ func (c *NopWriteClient) Store(context.Context, []byte, int, config.RemoteWriteF } func (c *NopWriteClient) Name() string { return "nopwriteclient" } func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } -func (c *NopWriteClient) GetProtoVersions(_ context.Context) (string, error) { - return "2.0;snappy,0.1.0", nil +func (c *NopWriteClient) probeRemoteVersions(_ context.Context) error { + return nil } func (c *NopWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" } diff --git a/storage/remote/write.go b/storage/remote/write.go index 0b79cfa2c2..7f087ca24e 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -203,8 +203,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { // If this newer remote write format is enabled then we need to probe the remote server // to work out the desired protocol version and compressions // The value of the header is kept in the client so no need to see it here - _, _ = c.GetProtoVersions(context.Background()) - // TODO(alexg): Since they're never used should I remove the return values of GetProtoVersion()? + _ = c.probeRemoteVersions(context.Background()) + // We ignore any error here, at some point we may choose to log it } // Redacted to remove any passwords in the URL (that are From f73b77d4e327ce7c1c6d54eb1e0135ee22d51967 Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Tue, 2 Apr 2024 15:55:01 +0100 Subject: [PATCH 08/10] Update storage/remote/client.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Alex Greenbank --- storage/remote/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 7ff17a5a1f..1bce9185ee 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -214,7 +214,6 @@ func (c *Client) probeRemoteVersions(ctx context.Context) error { return err } - // Set the version header to be nice httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) httpReq.Header.Set("User-Agent", UserAgent) From 4a758f168565f48e1d3e04bde35b3c2819c875eb Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Thu, 4 Apr 2024 12:56:40 +0000 Subject: [PATCH 09/10] Latest updates to review comments Signed-off-by: Alex Greenbank --- storage/remote/client.go | 46 +++++++++------ storage/remote/queue_manager.go | 86 +++++++++++++++------------- storage/remote/write.go | 12 ++-- storage/remote/write_handler.go | 44 +++++++------- storage/remote/write_handler_test.go | 24 ++++---- web/api/v1/api.go | 2 +- 6 files changed, 115 insertions(+), 99 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 7ff17a5a1f..2f1cc7581c 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -47,8 +47,20 @@ const maxErrMsgLen = 1024 var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) +// If we send a Remote Write 2.0 request to a Remote Write endpoint that only understands +// Remote Write 1.0 it will respond with an error. We need to handle these errors +// accordingly. Any 5xx errors will just need to be retried as they are considered +// transient/recoverable errors. A 4xx error will need to be passed back to the queue +// manager in order to be re-encoded in a suitable format. + +// A Remote Write 2.0 request sent to, for example, a Prometheus 2.50 receiver (which does +// not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver. var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 +// A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version) +// result in an HTTP 406 status code to indicate that it does not accept the protocol or +// encoding of that request and that the sender should retry with a more suitable protocol +// version or encoding. var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 var ( @@ -205,7 +217,7 @@ type RecoverableError struct { // Attempt a HEAD request against a remote write endpoint to see what it supports. func (c *Client) probeRemoteVersions(ctx context.Context) error { - // We assume we are in Version2 mode otherwise we shouldn't be calling this + // We assume we are in Version2 mode otherwise we shouldn't be calling this. httpReq, err := http.NewRequest("HEAD", c.urlString, nil) if err != nil { @@ -214,7 +226,7 @@ func (c *Client) probeRemoteVersions(ctx context.Context) error { return err } - // Set the version header to be nice + // Set the version header to be nice. httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) httpReq.Header.Set("User-Agent", UserAgent) @@ -223,32 +235,32 @@ func (c *Client) probeRemoteVersions(ctx context.Context) error { httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) if err != nil { - // We don't attempt a retry here + // We don't attempt a retry here. return err } - // See if we got a header anyway + // See if we got a header anyway. promHeader := httpResp.Header.Get(RemoteWriteVersionHeader) - // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank + // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank. if promHeader != "" { c.lastRWHeader = promHeader } - // Check for an error + // Check for an error. if httpResp.StatusCode != 200 { if httpResp.StatusCode == 405 { // If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't // understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten - // even if it is blank + // even if it is blank. // This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives - // a response that confirms it can speak 2.0 + // a response that confirms it can speak 2.0. c.lastRWHeader = promHeader } return fmt.Errorf(httpResp.Status) } - // All ok, return no error + // All ok, return no error. return nil } @@ -269,7 +281,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co if rwFormat == Version1 { httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue) } else { - // Set the right header if we're using v2.0 remote write protocol + // Set the right header if we're using v2.0 remote write protocol. httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) } @@ -294,9 +306,9 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co httpResp.Body.Close() }() - // See if we got a X-Prometheus-Remote-Write header in the response + // See if we got a X-Prometheus-Remote-Write header in the response. if promHeader := httpResp.Header.Get(RemoteWriteVersionHeader); promHeader != "" { - // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank + // Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank. // (It's blank if it wasn't present, we don't care about that distinction.) c.lastRWHeader = promHeader } @@ -309,18 +321,18 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co } switch httpResp.StatusCode { case 400: - // Return an unrecoverable error to indicate the 400 - // This then gets passed up the chain so we can react to it properly + // Return an unrecoverable error to indicate the 400. + // This then gets passed up the chain so we can react to it properly. // TODO(alexg) Do we want to include the first line of the message? return ErrStatusBadRequest case 406: - // Return an unrecoverable error to indicate the 406 - // This then gets passed up the chain so we can react to it properly + // Return an unrecoverable error to indicate the 406. + // This then gets passed up the chain so we can react to it properly. // TODO(alexg) Do we want to include the first line of the message? // TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error? return ErrStatusNotAcceptable default: - // We want to end up returning a non-specific error + // We want to end up returning a non-specific error. err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) } } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 7a3234de6c..15773bbcdf 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -394,15 +394,15 @@ type WriteClient interface { Name() string // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string - // Get the protocol versions supported by the endpoint + // Get the protocol versions supported by the endpoint. probeRemoteVersions(ctx context.Context) error - // Get the last RW header received from the endpoint + // Get the last RW header received from the endpoint. GetLastRWHeader() string } const ( Version1 config.RemoteWriteFormat = iota // 1.0, 0.1, etc. - Version2 // symbols are indices into an array of strings + Version2 // symbols are indices into an array of strings. ) // QueueManager manages a queue of samples to be sent to the Storage @@ -576,7 +576,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - // Get compression to use from content negotiation based on last header seen (defaults to snappy) + // Get compression to use from content negotiation based on last header seen (defaults to snappy). compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader()) req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression) @@ -1500,26 +1500,25 @@ func (q *queue) newBatch(capacity int) []timeSeries { func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) { if rwFormat == Version1 { - // If we're only handling Version1 then all we can do is that with snappy compression + // If we're only handling Version1 then all we can do is that with snappy compression. return "snappy", Version1 } if rwFormat != Version2 { // If we get here then someone has added a new RemoteWriteFormat value but hasn't - // fixed this function to handle it - // panic! + // fixed this function to handle it. Panic! panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat)) } if lastHeaderSeen == "" { - // We haven't had a valid header, so we just default to 0.1.0/snappy + // We haven't had a valid header, so we just default to "0.1.0/snappy". return "snappy", Version1 } // We can currently handle: // "2.0;snappy" // "0.1.0" - implicit compression of snappy - // lastHeaderSeen should contain a list of tuples - // If we find a match to something we can handle then we can return that + // lastHeaderSeen should contain a list of tuples. + // If we find a match to something we can handle then we can return that. for _, tuple := range strings.Split(lastHeaderSeen, ",") { - // Remove spaces from the tuple + // Remove spaces from the tuple. curr := strings.ReplaceAll(tuple, " ", "") switch curr { case "2.0;snappy": @@ -1529,7 +1528,7 @@ func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) } } - // Otherwise we have to default to "0.1.0" + // Otherwise we have to default to "0.1.0". return "snappy", Version1 } @@ -1626,20 +1625,20 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - // Resend logic on 406 - // ErrStatusNotAcceptable is a new error defined in client.go + // Resend logic on 406. + // ErrStatusNotAcceptable is a new error defined in client.go. - // Work out what version to send based on the last header seen and the QM's rwFormat setting - // TODO(alexg) - see comments below about retry/renegotiate design + // Work out what version to send based on the last header seen and the QM's rwFormat setting. + // TODO(alexg) - see comments below about retry/renegotiate design. for attemptNos := 1; attemptNos <= 3; attemptNos++ { lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) sendErr := attemptBatchSend(batch, rwFormat, compression, false) if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { - // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying + // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying. break } - // If we get either of the two errors (406, 400) we loop and re-negotiate + // If we get either of the two errors (406, 400) we loop and re-negotiate. } queue.ReturnForReuse(batch) @@ -1651,15 +1650,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { batch := queue.Batch() if len(batch) > 0 { for attemptNos := 1; attemptNos <= 3; attemptNos++ { - // Work out what version to send based on the last header seen and the QM's rwFormat setting + // Work out what version to send based on the last header seen and the QM's rwFormat setting. lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) sendErr := attemptBatchSend(batch, rwFormat, compression, true) if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { - // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying + // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying. break } - // If we get either of the two errors (406, 400) we loop and re-negotiate + // If we get either of the two errors (406, 400) we loop and re-negotiate. } // TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we // Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format @@ -1720,7 +1719,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin)) - // Return the error in case it is a 406 and we need to reformat the data + // Return the error in case it is a 406 and we need to reformat the data. return err } @@ -1729,7 +1728,7 @@ func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin)) - // Return the error in case it is a 406 and we need to reformat the data + // Return the error in case it is a 406 and we need to reformat the data. return err } @@ -2095,6 +2094,22 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms } +func compressPayload(tmpbuf *[]byte, inp []byte, compression string) ([]byte, error) { + var compressed []byte + + switch compression { + case "snappy": + compressed = snappy.Encode(*tmpbuf, inp) + if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) { + // grow the buffer for the next time + *tmpbuf = make([]byte, n) + } + return compressed, nil + default: + return compressed, fmt.Errorf("Unknown compression scheme [%s]", compression) + } +} + func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, compression string) ([]byte, int64, int64, error) { highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) @@ -2128,16 +2143,11 @@ func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metada var compressed []byte - switch compression { - case "snappy": - compressed = snappy.Encode(*buf, pBuf.Bytes()) - if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) - } - default: - return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression) + compressed, err = compressPayload(buf, pBuf.Bytes(), compression) + if err != nil { + return nil, highest, lowest, err } + return compressed, highest, lowest, nil } @@ -2205,15 +2215,9 @@ func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels var compressed []byte - switch compression { - case "snappy": - compressed = snappy.Encode(*buf, data) - if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) - } - default: - return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression) + compressed, err = compressPayload(buf, data, compression) + if err != nil { + return nil, highest, lowest, err } return compressed, highest, lowest, nil diff --git a/storage/remote/write.go b/storage/remote/write.go index 7f087ca24e..3f544456dd 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -192,19 +192,19 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { continue } - // Work out what protocol and compression to use for this endpoint - // Default to Remote Write Version1 + // Work out what protocol and compression to use for this endpoint. + // Default to Remote Write Version1. rwFormat := Version1 switch rwConf.ProtocolVersion { case Version1: - // We use the standard value as there's no negotiation to be had + // We use the standard value as there's no negotiation to be had. case Version2: rwFormat = Version2 // If this newer remote write format is enabled then we need to probe the remote server - // to work out the desired protocol version and compressions - // The value of the header is kept in the client so no need to see it here + // to work out the desired protocol version and compressions. + // The value of the header is kept in the client so no need to see it here. _ = c.probeRemoteVersions(context.Background()) - // We ignore any error here, at some point we may choose to log it + // We ignore any error here, at some point we may choose to log it. } // Redacted to remove any passwords in the URL (that are diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 7ace752e55..30918ba8ce 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -45,7 +45,7 @@ const ( ) func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string { - // Return the correct remote write header name/values based on provided rwFormat + // Return the correct remote write header name/values based on provided rwFormat. ret := make(map[string]string, 1) switch rwFormat { @@ -54,9 +54,9 @@ func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string { case Version2: // We need to add the supported protocol definitions in order: tuples := make([]string, 0, 2) - // Add 2.0;snappy + // Add "2.0;snappy". tuples = append(tuples, RemoteWriteVersion20HeaderValue+";snappy") - // Add default 0.1.0 + // Add default "0.1.0". tuples = append(tuples, RemoteWriteVersion1HeaderValue) ret[RemoteWriteVersionHeader] = strings.Join(tuples, ",") } @@ -68,8 +68,8 @@ type writeHeadHandler struct { remoteWrite20HeadRequests prometheus.Counter - // Experimental feature, new remote write proto format - // The handler will accept the new format, but it can still accept the old one + // Experimental feature, new remote write proto format. + // The handler will accept the new format, but it can still accept the old one. rwFormat config.RemoteWriteFormat } @@ -91,9 +91,9 @@ func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat } func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Send a response to the HEAD request based on the format supported + // Send a response to the HEAD request based on the format supported. - // Add appropriate header values for the specific rwFormat + // Add appropriate header values for the specific rwFormat. for hName, hValue := range rwHeaderNameValues(h.rwFormat) { w.Header().Set(hName, hValue) } @@ -107,8 +107,8 @@ type writeHandler struct { samplesWithInvalidLabelsTotal prometheus.Counter - // Experimental feature, new remote write proto format - // The handler will accept the new format, but it can still accept the old one + // Experimental feature, new remote write proto format. + // The handler will accept the new format, but it can still accept the old one. rwFormat config.RemoteWriteFormat } @@ -135,36 +135,36 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var err error - // Set the header(s) in the response based on the rwFormat the server supports + // Set the header(s) in the response based on the rwFormat the server supports. for hName, hValue := range rwHeaderNameValues(h.rwFormat) { w.Header().Set(hName, hValue) } - // Parse the headers to work out how to handle this + // Parse the headers to work out how to handle this. contentEncoding := r.Header.Get("Content-Encoding") protoVer := r.Header.Get(RemoteWriteVersionHeader) switch protoVer { case "": - // No header provided, assume 0.1.0 as everything that relies on later + // No header provided, assume 0.1.0 as everything that relies on later. protoVer = RemoteWriteVersion1HeaderValue case RemoteWriteVersion1HeaderValue, RemoteWriteVersion20HeaderValue: - // We know this header, woo + // We know this header, woo. default: - // We have a version in the header but it is not one we recognise + // We have a version in the header but it is not one we recognise. // TODO(alexg) - make a proper error for this? level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) - // Return a 406 so that the client can choose a more appropriate protocol to use + // Return a 406 so that the client can choose a more appropriate protocol to use. http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) return } - // Deal with 0.1.0 clients that forget to send Content-Encoding + // Deal with 0.1.0 clients that forget to send Content-Encoding. if protoVer == RemoteWriteVersion1HeaderValue && contentEncoding == "" { contentEncoding = "snappy" } - // Read the request body + // Read the request body. body, err := io.ReadAll(r.Body) if err != nil { level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) @@ -172,7 +172,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // Deal with contentEncoding first + // Deal with contentEncoding first. var decompressed []byte switch contentEncoding { @@ -185,13 +185,13 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } default: level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding) - // Return a 406 so that the client can choose a more appropriate protocol to use + // Return a 406 so that the client can choose a more appropriate protocol to use. http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable) return } - // Now we have a decompressed buffer we can unmarshal it - // At this point we are happy with the version but need to check the encoding + // Now we have a decompressed buffer we can unmarshal it. + // At this point we are happy with the version but need to check the encoding. switch protoVer { case RemoteWriteVersion1HeaderValue: var req prompb.WriteRequest @@ -202,7 +202,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } err = h.write(r.Context(), &req) case RemoteWriteVersion20HeaderValue: - // 2.0 request + // 2.0 request. var reqMinStr writev2.WriteRequest if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil { level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index c230117976..7063e988b8 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -51,19 +51,19 @@ func TestRemoteWriteHeadHandler(t *testing.T) { resp := recorder.Result() require.Equal(t, http.StatusOK, resp.StatusCode) - // Check header is expected value + // Check header is expected value. protHeader := resp.Header.Get(RemoteWriteVersionHeader) require.Equal(t, "2.0;snappy,0.1.0", protHeader) } func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) { - // Send a v2 request without a "Content-Encoding:" header -> 406 + // Send a v2 request without a "Content-Encoding:" header -> 406. buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - // Do not provide "Content-Encoding: snappy" header + // Do not provide "Content-Encoding: snappy" header. // req.Header.Set("Content-Encoding", "snappy") require.NoError(t, err) @@ -74,12 +74,12 @@ func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) { handler.ServeHTTP(recorder, req) resp := recorder.Result() - // Should give us a 406 + // Should give us a 406. require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) } func TestRemoteWriteHandlerInvalidCompression(t *testing.T) { - // Send a v2 request without an unhandled compression scheme -> 406 + // Send a v2 request without an unhandled compression scheme -> 406. buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) @@ -95,12 +95,12 @@ func TestRemoteWriteHandlerInvalidCompression(t *testing.T) { handler.ServeHTTP(recorder, req) resp := recorder.Result() - // Expect a 406 + // Expect a 406. require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) } func TestRemoteWriteHandlerInvalidVersion(t *testing.T) { - // Send a protocol version number that isn't recognised/supported -> 406 + // Send a protocol version number that isn't recognised/supported -> 406. buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) @@ -115,7 +115,7 @@ func TestRemoteWriteHandlerInvalidVersion(t *testing.T) { handler.ServeHTTP(recorder, req) resp := recorder.Result() - // Expect a 406 + // Expect a 406. require.Equal(t, http.StatusNotAcceptable, resp.StatusCode) } @@ -135,7 +135,7 @@ func TestRemoteWriteHandler(t *testing.T) { resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) - // Check header is expected value + // Check header is expected value. protHeader := resp.Header.Get(RemoteWriteVersionHeader) require.Equal(t, "0.1.0", protHeader) @@ -175,7 +175,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { req, err := http.NewRequest("", "", bytes.NewReader(buf)) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - // Must provide "Content-Encoding: snappy" header + // Must provide "Content-Encoding: snappy" header. req.Header.Set("Content-Encoding", "snappy") require.NoError(t, err) @@ -188,7 +188,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) - // Check header is expected value + // Check header is expected value. protHeader := resp.Header.Get(RemoteWriteVersionHeader) require.Equal(t, "2.0;snappy,0.1.0", protHeader) @@ -196,7 +196,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { j := 0 k := 0 // the reduced write request is equivalent to the write request fixture. - // we can use it for + // we can use it for. for _, ts := range writeRequestMinimizedFixture.Timeseries { ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols) for _, s := range ts.Samples { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 203ed6d147..09828e079f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -297,7 +297,7 @@ func NewAPI( } if rwEnabled { - // TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising + // TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising. // For rollout we do two phases: // 0. (Before) no flags set // 1. (During) support new protocols but don't advertise From ae9d2a772abababe156e003bc72e71b5f797a30f Mon Sep 17 00:00:00 2001 From: Alex Greenbank Date: Thu, 4 Apr 2024 13:27:28 +0000 Subject: [PATCH 10/10] latest tweaks Signed-off-by: Alex Greenbank --- storage/remote/queue_manager_test.go | 34 +++++++++++++--------------- storage/remote/write_handler.go | 1 - storage/remote/write_handler_test.go | 2 +- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a570d433d4..447932987b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -78,55 +78,55 @@ func TestContentNegotiation(t *testing.T) { rwFormat config.RemoteWriteFormat steps []contentNegotiationStep }{ - // Test a simple case where the v2 request we send is processed first time + // Test a simple case where the v2 request we send is processed first time. { success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, }, }, - // Test a simple case where the v1 request we send is processed first time + // Test a simple case where the v1 request we send is processed first time. { success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, }, }, - // Test a case where the v1 request has a temporary delay but goes through on retry - // There is no content re-negotiation between first and retry attempts + // Test a case where the v1 request has a temporary delay but goes through on retry. + // There is no content re-negotiation between first and retry attempts. { success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, }, }, - // Repeat the above test but with v2. The request has a temporary delay but goes through on retry - // There is no content re-negotiation between first and retry attempts + // Repeat the above test but with v2. The request has a temporary delay but goes through on retry. + // There is no content re-negotiation between first and retry attempts. { success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, }, }, - // Now test where the server suddenly stops speaking 2.0 and we need to downgrade + // Now test where the server suddenly stops speaking 2.0 and we need to downgrade. { success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, }, }, - // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400 + // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400. { success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, }, }, - // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only + // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only. { success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"}, - // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries) + // There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries). }, }, } @@ -138,8 +138,6 @@ func TestContentNegotiation(t *testing.T) { // We need to set URL's so that metric creation doesn't panic. writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig.QueueConfig = queueConfig - writeConfig.SendExemplars = true // ALEXG - need? - writeConfig.SendNativeHistograms = true // ALEXG - need? conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -175,7 +173,7 @@ func TestContentNegotiation(t *testing.T) { qm := s.rws.queues[hash] c := NewTestWriteClient(tc.rwFormat) - c.setSteps(tc.steps) // set expected behaviour + c.setSteps(tc.steps) // set expected behaviour. qm.SetClient(c) qm.StoreSeries(series, 0) @@ -188,12 +186,12 @@ func TestContentNegotiation(t *testing.T) { qm.Append(samples) if !tc.success { - // We just need to sleep for a bit to give it time to run + // We just need to sleep for a bit to give it time to run. time.Sleep(2 * time.Second) - // But we still need to check for data with no delay to avoid race + // But we still need to check for data with no delay to avoid race. c.waitForExpectedData(t, 0*time.Second) } else { - // We expected data so wait for it + // We expected data so wait for it. c.waitForExpectedData(t, 5*time.Second) } @@ -1253,11 +1251,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression) if attemptNos > 0 { - // If this is a second attempt then we need to bump to the next step otherwise we loop + // If this is a second attempt then we need to bump to the next step otherwise we loop. c.currstep++ } - // Check if we've been told to return something for this config + // Check if we've been told to return something for this config. if len(c.steps) > 0 { if err = c.steps[c.currstep].behaviour; err != nil { c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err)) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 30918ba8ce..b80799706d 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -152,7 +152,6 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // We know this header, woo. default: // We have a version in the header but it is not one we recognise. - // TODO(alexg) - make a proper error for this? level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) // Return a 406 so that the client can choose a more appropriate protocol to use. http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 7063e988b8..f3f8ca0ba3 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -196,7 +196,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { j := 0 k := 0 // the reduced write request is equivalent to the write request fixture. - // we can use it for. + // we can use it for for _, ts := range writeRequestMinimizedFixture.Timeseries { ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols) for _, s := range ts.Samples {