diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 2bc2237e2f..46246b672a 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -116,7 +116,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s // Encode the request body into snappy encoding. compressed := snappy.Encode(nil, raw) - err = client.Store(context.Background(), compressed) + err = client.Store(context.Background(), compressed, 0) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false diff --git a/storage/remote/client.go b/storage/remote/client.go index 33774203c5..d493e414f4 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -195,7 +195,7 @@ type RecoverableError struct { // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // and encoded bytes from codec.go. -func (c *Client) Store(ctx context.Context, req []byte) error { +func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req)) if err != nil { // Errors from NewRequest are from unparsable URLs, so are not @@ -207,6 +207,10 @@ func (c *Client) Store(ctx context.Context, req []byte) error { httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", UserAgent) httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + if attempt > 0 { + httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt)) + } + ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 2f954a0b9f..a42a52e9a7 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { c, err := NewWriteClient(hash, conf) require.NoError(t, err) - err = c.Store(context.Background(), []byte{}) + err = c.Store(context.Background(), []byte{}, 0) if test.err != nil { require.EqualError(t, err, test.err.Error()) } else { @@ -112,7 +112,7 @@ func TestClientRetryAfter(t *testing.T) { var recErr RecoverableError c := getClient(conf) - err = c.Store(context.Background(), []byte{}) + err = c.Store(context.Background(), []byte{}, 0) require.False(t, errors.As(err, &recErr), "Recoverable error not expected.") conf = &ClientConfig{ @@ -122,7 +122,7 @@ func TestClientRetryAfter(t *testing.T) { } c = getClient(conf) - err = c.Store(context.Background(), []byte{}) + err = c.Store(context.Background(), []byte{}, 0) require.True(t, errors.As(err, &recErr), "Recoverable error was expected.") } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 1c834db776..975ff9af71 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -380,7 +380,7 @@ func (m *queueManagerMetrics) unregister() { // external timeseries database. type WriteClient interface { // Store stores the given samples in the remote storage. - Store(context.Context, []byte) error + Store(context.Context, []byte, int) error // Name uniquely identifies the remote storage. Name() string // Endpoint is the remote read or write endpoint for the storage client. @@ -552,7 +552,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p } begin := time.Now() - err := t.storeClient.Store(ctx, req) + err := t.storeClient.Store(ctx, req, try) t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { @@ -1526,7 +1526,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - err := s.qm.client().Store(ctx, *buf) + err := s.qm.client().Store(ctx, *buf, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index b43258ff06..a141df3480 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -769,7 +769,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { } } -func (c *TestWriteClient) Store(_ context.Context, req []byte) error { +func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { c.mtx.Lock() defer c.mtx.Unlock() // nil buffers are ok for snappy, ignore cast error. @@ -843,7 +843,7 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient { return &TestBlockingWriteClient{} } -func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error { +func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error { c.numCalls.Inc() <-ctx.Done() return nil @@ -864,10 +864,10 @@ func (c *TestBlockingWriteClient) Endpoint() string { // For benchmarking the send and not the receive side. type NopWriteClient struct{} -func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } -func (c *NopWriteClient) Store(context.Context, []byte) error { return nil } -func (c *NopWriteClient) Name() string { return "nopwriteclient" } -func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } +func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil } +func (c *NopWriteClient) Name() string { return "nopwriteclient" } +func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } func BenchmarkSampleSend(b *testing.B) { // Send one sample per series, which is the typical remote_write case