diff --git a/CHANGELOG.md b/CHANGELOG.md index d5a91e900..e7314d041 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## unreleased +* [FEATURE] Remote-Write: Add sender and receiver support for [Remote Write 2.0-rc.2](https://prometheus.io/docs/specs/remote_write_spec_2_0/) specification #14395 #14427 #14444 +* [ENHANCEMENT] Remote-Write: 1.x messages against Remote Write 2.x Receivers will have now correct values for `prometheus_storage__failed_total` in case of partial errors #14444 + ## 2.53.1 / 2024-07-10 Fix a bug which would drop samples in remote-write if the sending flow stalled diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 46246b672..6d162f459 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -101,6 +101,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return successExitCode } +// TODO(bwplotka): Add PRW 2.0 support. func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool { metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels) if err != nil { @@ -116,7 +117,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s // Encode the request body into snappy encoding. compressed := snappy.Encode(nil, raw) - err = client.Store(context.Background(), compressed, 0) + _, err = client.Store(context.Background(), compressed, 0) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false diff --git a/storage/remote/client.go b/storage/remote/client.go index eff44c606..17caf7be9 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -14,7 +14,6 @@ package remote import ( - "bufio" "bytes" "context" "fmt" @@ -235,12 +234,12 @@ type RecoverableError struct { // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // and encoded bytes from codec.go. -func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { +func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteResponseStats, error) { httpReq, err := http.NewRequest(http.MethodPost, c.urlString, bytes.NewReader(req)) if err != nil { // Errors from NewRequest are from unparsable URLs, so are not // recoverable. - return err + return WriteResponseStats{}, err } httpReq.Header.Add("Content-Encoding", string(c.writeCompression)) @@ -267,28 +266,34 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { if err != nil { // Errors from Client.Do are from (for example) network errors, so are // recoverable. - return RecoverableError{err, defaultBackoff} + return WriteResponseStats{}, RecoverableError{err, defaultBackoff} } defer func() { io.Copy(io.Discard, httpResp.Body) httpResp.Body.Close() }() + // TODO(bwplotka): Pass logger and emit debug on error? + // Parsing error means there were some response header values we can't parse, + // we can continue handling. + rs, _ := ParseWriteResponseStats(httpResp) + //nolint:usestdlibvars - if httpResp.StatusCode/100 != 2 { - scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) - line := "" - if scanner.Scan() { - line = scanner.Text() - } - err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + if httpResp.StatusCode/100 == 2 { + return rs, nil } + + // Handling errors e.g. read potential error in the body. + // TODO(bwplotka): Pass logger and emit debug on error? + body, _ := io.ReadAll(io.LimitReader(httpResp.Body, maxErrMsgLen)) + err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, body) + //nolint:usestdlibvars if httpResp.StatusCode/100 == 5 || (c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) { - return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))} + return rs, RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))} } - return err + return rs, err } // retryAfterDuration returns the duration for the Retry-After header. In case of any errors, it diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 2acb8e279..9184ce100 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { c, err := NewWriteClient(hash, conf) require.NoError(t, err) - err = c.Store(context.Background(), []byte{}, 0) + _, err = c.Store(context.Background(), []byte{}, 0) if test.err != nil { require.EqualError(t, err, test.err.Error()) } else { @@ -133,7 +133,7 @@ func TestClientRetryAfter(t *testing.T) { c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit)) var recErr RecoverableError - err = c.Store(context.Background(), []byte{}, 0) + _, err = c.Store(context.Background(), []byte{}, 0) require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.") if tc.expectedRecoverable { require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter) @@ -169,7 +169,7 @@ func TestRetryAfterDuration(t *testing.T) { } } -func TestClientHeaders(t *testing.T) { +func TestClientCustomHeaders(t *testing.T) { headersToSend := map[string]string{"Foo": "Bar", "Baz": "qux"} var called bool @@ -203,7 +203,7 @@ func TestClientHeaders(t *testing.T) { c, err := NewWriteClient("c", conf) require.NoError(t, err) - err = c.Store(context.Background(), []byte{}, 0) + _, err = c.Store(context.Background(), []byte{}, 0) require.NoError(t, err) require.True(t, called, "The remote server wasn't called") diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 5bafb9da2..5b59288e6 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -391,7 +391,7 @@ func (m *queueManagerMetrics) unregister() { // external timeseries database. type WriteClient interface { // Store stores the given samples in the remote storage. - Store(ctx context.Context, req []byte, retryAttempt int) error + Store(ctx context.Context, req []byte, retryAttempt int) (WriteResponseStats, error) // Name uniquely identifies the remote storage. Name() string // Endpoint is the remote read or write endpoint for the storage client. @@ -597,14 +597,15 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p } begin := time.Now() - err := t.storeClient.Store(ctx, req, try) + // Ignoring WriteResponseStats, because there is nothing for metadata, since it's + // embedded in v2 calls now, and we do v1 here. + _, err := t.storeClient.Store(ctx, req, try) t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { span.RecordError(err) return err } - return nil } @@ -1661,8 +1662,8 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc) - s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin)) + rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc) + s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, rs, time.Since(begin)) return err } @@ -1670,17 +1671,29 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s // See https://github.com/prometheus/prometheus/issues/14409 func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error { begin := time.Now() - err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc) - s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin)) + rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc) + s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, rs, time.Since(begin)) return err } -func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) { +func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, rs WriteResponseStats, duration time.Duration) { + // Partial errors may happen -- account for that. + sampleDiff := sampleCount - rs.Samples + if sampleDiff > 0 { + s.qm.metrics.failedSamplesTotal.Add(float64(sampleDiff)) + } + histogramDiff := histogramCount - rs.Histograms + if histogramDiff > 0 { + s.qm.metrics.failedHistogramsTotal.Add(float64(histogramDiff)) + } + exemplarDiff := exemplarCount - rs.Exemplars + if exemplarDiff > 0 { + s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarDiff)) + } if err != nil { - level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "histogramCount", histogramCount, "err", err) - s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) + level.Error(s.qm.logger).Log("msg", "non-recoverable error", "failedSampleCount", sampleDiff, "failedHistogramCount", histogramDiff, "failedExemplarCount", exemplarDiff, "err", err) + } else if sampleDiff+exemplarDiff+histogramDiff > 0 { + level.Error(s.qm.logger).Log("msg", "we got 2xx status code from the Receiver yet statistics indicate some dat was not written; investigation needed", "failedSampleCount", sampleDiff, "failedHistogramCount", histogramDiff, "failedExemplarCount", exemplarDiff) } // These counters are used to calculate the dynamic sharding, and as such @@ -1688,6 +1701,7 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl s.qm.dataOut.incr(int64(sampleCount + exemplarCount + histogramCount + metadataCount)) s.qm.dataOutDuration.incr(int64(duration)) s.qm.lastSendTimestamp.Store(time.Now().Unix()) + // Pending samples/exemplars/histograms also should be subtracted, as an error means // they will not be retried. s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) @@ -1699,19 +1713,29 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) (WriteResponseStats, error) { // Build the WriteRequest with no metadata. req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - return err + return WriteResponseStats{}, err } reqSize := len(req) *buf = req + // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need + // to track the total amount of accepted data across the various attempts. + accumulatedStats := WriteResponseStats{} + var accumulatedStatsMu sync.Mutex + addStats := func(rs WriteResponseStats) { + accumulatedStatsMu.Lock() + accumulatedStats = accumulatedStats.Add(rs) + accumulatedStatsMu.Unlock() + } + // An anonymous function allows us to defer the completion of our per-try spans // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. @@ -1759,15 +1783,19 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.metadataTotal.Add(float64(metadataCount)) - err := s.qm.client().Store(ctx, *buf, try) + // Technically for v1, we will likely have empty response stats, but for + // newer Receivers this might be not, so used it in a best effort. + rs, err := s.qm.client().Store(ctx, *buf, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) + // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error + // so far we don't have those, so it's ok to potentially skew statistics. + addStats(rs) - if err != nil { - span.RecordError(err) - return err + if err == nil { + return nil } - - return nil + span.RecordError(err) + return err } onRetry := func() { @@ -1780,29 +1808,48 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti if errors.Is(err, context.Canceled) { // When there is resharding, we cancel the context for this queue, which means the data is not sent. // So we exit early to not update the metrics. - return err + return accumulatedStats, err } s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) - return err + if err == nil && !accumulatedStats.Confirmed { + // No 2.0 response headers, and we sent v1 message, so likely it's 1.0 Receiver. + // Assume success, don't rely on headers. + return WriteResponseStats{ + Samples: sampleCount, + Histograms: histogramCount, + Exemplars: exemplarCount, + }, nil + } + return accumulatedStats, err } // sendV2Samples to the remote storage with backoff for recoverable errors. -func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error { +func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) (WriteResponseStats, error) { // Build the WriteRequest with no metadata. req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - return err + return WriteResponseStats{}, err } reqSize := len(req) *buf = req + // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need + // to track the total amount of accepted data across the various attempts. + accumulatedStats := WriteResponseStats{} + var accumulatedStatsMu sync.Mutex + addStats := func(rs WriteResponseStats) { + accumulatedStatsMu.Lock() + accumulatedStats = accumulatedStats.Add(rs) + accumulatedStatsMu.Unlock() + } + // An anonymous function allows us to defer the completion of our per-try spans // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. @@ -1850,15 +1897,28 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.metadataTotal.Add(float64(metadataCount)) - err := s.qm.client().Store(ctx, *buf, try) + rs, err := s.qm.client().Store(ctx, *buf, try) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) + // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error + // so far we don't have those, so it's ok to potentially skew statistics. + addStats(rs) - if err != nil { - span.RecordError(err) - return err + if err == nil { + // Check the case mentioned in PRW 2.0 + // https://prometheus.io/docs/specs/remote_write_spec_2_0/#required-written-response-headers. + if sampleCount+histogramCount+exemplarCount > 0 && rs.NoDataWritten() { + err = fmt.Errorf("sent v2 request with %v samples, %v histograms and %v exemplars; got 2xx, but PRW 2.0 response header statistics indicate %v samples, %v histograms and %v exemplars were accepted;"+ + " assumining failure e.g. the target only supports PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly", + sampleCount, histogramCount, exemplarCount, + rs.Samples, rs.Histograms, rs.Exemplars, + ) + span.RecordError(err) + return err + } + return nil } - - return nil + span.RecordError(err) + return err } onRetry := func() { @@ -1871,13 +1931,12 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 if errors.Is(err, context.Canceled) { // When there is resharding, we cancel the context for this queue, which means the data is not sent. // So we exit early to not update the metrics. - return err + return accumulatedStats, err } s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) - - return err + return accumulatedStats, err } func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 5227c2d6a..7343184fc 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -118,10 +118,10 @@ func TestBasicContentNegotiation(t *testing.T) { expectFail: true, }, { - name: "v2 talks to v1 that tries to unmarshal v2 payload with v1 proto", + name: "v2 talks to (broken) v1 that tries to unmarshal v2 payload with v1 proto", senderProtoMsg: config.RemoteWriteProtoMsgV2, receiverProtoMsg: config.RemoteWriteProtoMsgV1, injectErrs: []error{nil}, - expectFail: true, // invalid request, no timeseries + expectFail: true, // We detect this thanks to https://github.com/prometheus/prometheus/issues/14359 }, // Opposite, v1 talking to v2 only server. { @@ -130,12 +130,6 @@ func TestBasicContentNegotiation(t *testing.T) { injectErrs: []error{errors.New("pretend unrecoverable err")}, expectFail: true, }, - { - name: "v1 talks to (broken) v2 that tries to unmarshal v1 payload with v2 proto", - senderProtoMsg: config.RemoteWriteProtoMsgV1, receiverProtoMsg: config.RemoteWriteProtoMsgV2, - injectErrs: []error{nil}, - expectFail: true, // invalid request, no timeseries - }, } { t.Run(tc.name, func(t *testing.T) { dir := t.TempDir() @@ -182,7 +176,6 @@ func TestBasicContentNegotiation(t *testing.T) { if !tc.expectFail { // No error expected, so wait for data. c.waitForExpectedData(t, 5*time.Second) - require.Equal(t, 1, c.writesReceived) require.Equal(t, 0.0, client_testutil.ToFloat64(qm.metrics.failedSamplesTotal)) } else { // Wait for failure to be recorded in metrics. @@ -190,11 +183,10 @@ func TestBasicContentNegotiation(t *testing.T) { defer cancel() require.NoError(t, runutil.Retry(500*time.Millisecond, ctx.Done(), func() error { if client_testutil.ToFloat64(qm.metrics.failedSamplesTotal) != 1.0 { - return errors.New("expected one sample failed in qm metrics") + return fmt.Errorf("expected one sample failed in qm metrics; got %v", client_testutil.ToFloat64(qm.metrics.failedSamplesTotal)) } return nil })) - require.Equal(t, 0, c.writesReceived) } // samplesTotal means attempts. @@ -764,10 +756,10 @@ func TestDisableReshardOnRetry(t *testing.T) { metrics = newQueueManagerMetrics(nil, "", "") client = &MockWriteClient{ - StoreFunc: func(ctx context.Context, b []byte, i int) error { + StoreFunc: func(ctx context.Context, b []byte, i int) (WriteResponseStats, error) { onStoreCalled() - return RecoverableError{ + return WriteResponseStats{}, RecoverableError{ error: fmt.Errorf("fake error"), retryAfter: model.Duration(retryAfter), } @@ -1113,14 +1105,14 @@ func (c *TestWriteClient) SetReturnError(err error) { c.returnError = err } -func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { +func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResponseStats, error) { c.mtx.Lock() defer c.mtx.Unlock() if c.storeWait > 0 { time.Sleep(c.storeWait) } if c.returnError != nil { - return c.returnError + return WriteResponseStats{}, c.returnError } // nil buffers are ok for snappy, ignore cast error. if c.buf != nil { @@ -1130,14 +1122,14 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { reqBuf, err := snappy.Decode(c.buf, req) c.buf = reqBuf if err != nil { - return err + return WriteResponseStats{}, err } // Check if we've been told to inject err for this call. if len(c.injectedErrs) > 0 { c.currErr++ if err = c.injectedErrs[c.currErr]; err != nil { - return err + return WriteResponseStats{}, err } } @@ -1156,13 +1148,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { } } if err != nil { - return err - } - - if len(reqProto.Timeseries) == 0 && len(reqProto.Metadata) == 0 { - return errors.New("invalid request, no timeseries") + return WriteResponseStats{}, err } + rs := WriteResponseStats{} b := labels.NewScratchBuilder(0) for _, ts := range reqProto.Timeseries { labels := ts.ToLabels(&b, nil) @@ -1170,10 +1159,12 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { if len(ts.Samples) > 0 { c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...) } + rs.Samples += len(ts.Samples) if len(ts.Exemplars) > 0 { c.receivedExemplars[tsID] = append(c.receivedExemplars[tsID], ts.Exemplars...) } + rs.Exemplars += len(ts.Exemplars) for _, h := range ts.Histograms { if h.IsFloatHistogram() { @@ -1182,13 +1173,14 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { c.receivedHistograms[tsID] = append(c.receivedHistograms[tsID], h) } } + rs.Histograms += len(ts.Histograms) } for _, m := range reqProto.Metadata { c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) } c.writesReceived++ - return nil + return rs, nil } func (c *TestWriteClient) Name() string { @@ -1256,10 +1248,10 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient { return &TestBlockingWriteClient{} } -func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error { +func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) (WriteResponseStats, error) { c.numCalls.Inc() <-ctx.Done() - return nil + return WriteResponseStats{}, nil } func (c *TestBlockingWriteClient) NumCalls() uint64 { @@ -1278,19 +1270,19 @@ func (c *TestBlockingWriteClient) Endpoint() string { type NopWriteClient struct{} func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } -func (c *NopWriteClient) Store(context.Context, []byte, int) error { - return nil +func (c *NopWriteClient) Store(context.Context, []byte, int) (WriteResponseStats, error) { + return WriteResponseStats{}, nil } func (c *NopWriteClient) Name() string { return "nopwriteclient" } func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } type MockWriteClient struct { - StoreFunc func(context.Context, []byte, int) error + StoreFunc func(context.Context, []byte, int) (WriteResponseStats, error) NameFunc func() string EndpointFunc func() string } -func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) error { +func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) (WriteResponseStats, error) { return c.StoreFunc(ctx, bb, n) } func (c *MockWriteClient) Name() string { return c.NameFunc() } diff --git a/storage/remote/stats.go b/storage/remote/stats.go new file mode 100644 index 000000000..89d00ffc3 --- /dev/null +++ b/storage/remote/stats.go @@ -0,0 +1,107 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "errors" + "net/http" + "strconv" +) + +const ( + rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written" + rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written" + rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written" +) + +// WriteResponseStats represents the response write statistics specified in https://github.com/prometheus/docs/pull/2486 +type WriteResponseStats struct { + // Samples represents X-Prometheus-Remote-Write-Written-Samples + Samples int + // Histograms represents X-Prometheus-Remote-Write-Written-Histograms + Histograms int + // Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars + Exemplars int + + // Confirmed means we can trust those statistics from the point of view + // of the PRW 2.0 spec. When parsed from headers, it means we got at least one + // response header from the Receiver to confirm those numbers, meaning it must + // be a at least 2.0 Receiver. See ParseWriteResponseStats for details. + Confirmed bool +} + +// NoDataWritten returns true if statistics indicate no data was written. +func (s WriteResponseStats) NoDataWritten() bool { + return (s.Samples + s.Histograms + s.Exemplars) == 0 +} + +// AllSamples returns both float and histogram sample numbers. +func (s WriteResponseStats) AllSamples() int { + return s.Samples + s.Histograms +} + +// Add returns the sum of this WriteResponseStats plus the given WriteResponseStats. +func (s WriteResponseStats) Add(rs WriteResponseStats) WriteResponseStats { + s.Confirmed = rs.Confirmed + s.Samples += rs.Samples + s.Histograms += rs.Histograms + s.Exemplars += rs.Exemplars + return s +} + +// SetHeaders sets response headers in a given response writer. +// Make sure to use it before http.ResponseWriter.WriteHeader and .Write. +func (s WriteResponseStats) SetHeaders(w http.ResponseWriter) { + h := w.Header() + h.Set(rw20WrittenSamplesHeader, strconv.Itoa(s.Samples)) + h.Set(rw20WrittenHistogramsHeader, strconv.Itoa(s.Histograms)) + h.Set(rw20WrittenExemplarsHeader, strconv.Itoa(s.Exemplars)) +} + +// ParseWriteResponseStats returns WriteResponseStats parsed from the response headers. +// +// As per 2.0 spec, missing header means 0. However, abrupt HTTP errors, 1.0 Receivers +// or buggy 2.0 Receivers might result in no response headers specified and that +// might NOT necessarily mean nothing was written. To represent that we set +// s.Confirmed = true only when see at least on response header. +// +// Error is returned when any of the header fails to parse as int64. +func ParseWriteResponseStats(r *http.Response) (s WriteResponseStats, err error) { + var ( + errs []error + h = r.Header + ) + if v := h.Get(rw20WrittenSamplesHeader); v != "" { // Empty means zero. + s.Confirmed = true + if s.Samples, err = strconv.Atoi(v); err != nil { + s.Samples = 0 + errs = append(errs, err) + } + } + if v := h.Get(rw20WrittenHistogramsHeader); v != "" { // Empty means zero. + s.Confirmed = true + if s.Histograms, err = strconv.Atoi(v); err != nil { + s.Histograms = 0 + errs = append(errs, err) + } + } + if v := h.Get(rw20WrittenExemplarsHeader); v != "" { // Empty means zero. + s.Confirmed = true + if s.Exemplars, err = strconv.Atoi(v); err != nil { + s.Exemplars = 0 + errs = append(errs, err) + } + } + return s, errors.Join(errs...) +} diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index d82237371..6756bf0ab 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -19,7 +19,6 @@ import ( "fmt" "io" "net/http" - "strconv" "strings" "time" @@ -201,7 +200,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { respStats, errHTTPCode, err := h.writeV2(r.Context(), &req) // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases. - respStats.SetResponseHeaders(w.Header()) + respStats.SetHeaders(w) if err != nil { if errHTTPCode/5 == 100 { // 5xx @@ -318,24 +317,6 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist return nil } -const ( - prw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Written-Samples" - rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Written-Histograms" - rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Written-Exemplars" -) - -type responseStats struct { - samples int - histograms int - exemplars int -} - -func (s responseStats) SetResponseHeaders(h http.Header) { - h.Set(prw20WrittenSamplesHeader, strconv.Itoa(s.samples)) - h.Set(rw20WrittenHistogramsHeader, strconv.Itoa(s.histograms)) - h.Set(rw20WrittenExemplarsHeader, strconv.Itoa(s.exemplars)) -} - // writeV2 is similar to write, but it works with v2 proto message, // allows partial 4xx writes and gathers statistics. // @@ -345,14 +326,14 @@ func (s responseStats) SetResponseHeaders(h http.Header) { // // NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors. // Once we have 5xx type of error, we immediately stop and rollback all appends. -func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ responseStats, errHTTPCode int, _ error) { +func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) { app := &timeLimitAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } - rs := responseStats{} - samplesWithoutMetadata, errHTTPCode, err := h.appendV2(app, req, &rs) + s := WriteResponseStats{} + samplesWithoutMetadata, errHTTPCode, err := h.appendV2(app, req, &s) if err != nil { if errHTTPCode/5 == 100 { // On 5xx, we always rollback, because we expect @@ -360,29 +341,29 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ res if rerr := app.Rollback(); rerr != nil { level.Error(h.logger).Log("msg", "writev2 rollback failed on retry-able error", "err", rerr) } - return responseStats{}, errHTTPCode, err + return WriteResponseStats{}, errHTTPCode, err } // Non-retriable (e.g. bad request error case). Can be partially written. commitErr := app.Commit() if commitErr != nil { // Bad requests does not matter as we have internal error (retryable). - return responseStats{}, http.StatusInternalServerError, commitErr + return WriteResponseStats{}, http.StatusInternalServerError, commitErr } // Bad request error happened, but rest of data (if any) was written. h.samplesAppendedWithoutMetadata.Add(float64(samplesWithoutMetadata)) - return rs, errHTTPCode, err + return s, errHTTPCode, err } // All good just commit. if err := app.Commit(); err != nil { - return responseStats{}, http.StatusInternalServerError, err + return WriteResponseStats{}, http.StatusInternalServerError, err } h.samplesAppendedWithoutMetadata.Add(float64(samplesWithoutMetadata)) - return rs, 0, nil + return s, 0, nil } -func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *responseStats) (samplesWithoutMetadata, errHTTPCode int, err error) { +func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *WriteResponseStats) (samplesWithoutMetadata, errHTTPCode int, err error) { var ( badRequestErrs []error outOfOrderExemplarErrs, samplesWithInvalidLabels int @@ -400,14 +381,14 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * continue } - allSamplesSoFar := rs.samples + rs.histograms + allSamplesSoFar := rs.AllSamples() var ref storage.SeriesRef // Samples. for _, s := range ts.Samples { ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) if err == nil { - rs.samples++ + rs.Samples++ continue } // Handle append error. @@ -431,7 +412,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, hp.ToIntHistogram(), nil) } if err == nil { - rs.histograms++ + rs.Histograms++ continue } // Handle append error. @@ -453,18 +434,19 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * e := ep.ToExemplar(&b, req.Symbols) ref, err = app.AppendExemplar(ref, ls, e) if err == nil { - rs.exemplars++ + rs.Exemplars++ continue } // Handle append error. - // TODO(bwplotka): I left the logic as in v1, but we might want to make it consistent with samples and histograms. - // Since exemplar storage is still experimental, we don't fail in anyway, the request on ingestion errors. if errors.Is(err, storage.ErrOutOfOrderExemplar) { - outOfOrderExemplarErrs++ - level.Debug(h.logger).Log("msg", "Out of order exemplar", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) + outOfOrderExemplarErrs++ // Maintain old metrics, but technically not needed, given we fail here. + level.Error(h.logger).Log("msg", "Out of order exemplar", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) + badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String())) continue } - level.Debug(h.logger).Log("msg", "Error while adding exemplar in AppendExemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err) + // TODO(bwplotka): Add strict mode which would trigger rollback of everything if needed. + // For now we keep the previously released flow (just error not debug leve) of dropping them without rollback and 5xx. + level.Error(h.logger).Log("msg", "failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) } m := ts.ToMetadata(req.Symbols) @@ -472,7 +454,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err) // Metadata is attached to each series, so since Prometheus does not reject sample without metadata information, // we don't report remote write error either. We increment metric instead. - samplesWithoutMetadata += (rs.samples + rs.histograms) - allSamplesSoFar + samplesWithoutMetadata += rs.AllSamples() - allSamplesSoFar } } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 9b5fb1a6e..af2229b9a 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -398,7 +398,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { { desc: "Partial write; skipped exemplar; exemplar storage errs are noop", input: writeV2RequestFixture.Timeseries, - appendExemplarErr: errors.New("some exemplar append error"), + appendExemplarErr: errors.New("some exemplar internal append error"), expectedCode: http.StatusNoContent, }, @@ -449,9 +449,9 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { if tc.expectedCode == http.StatusInternalServerError { // We don't expect writes for partial writes with retry-able code. - expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Samples")) - expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Histograms")) - expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars")) + expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenSamplesHeader)) + expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenHistogramsHeader)) + expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) require.Empty(t, len(appendable.samples)) require.Empty(t, len(appendable.histograms)) @@ -462,12 +462,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { // Double check mandatory 2.0 stats. // writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each. - expectHeaderValue(t, 2, resp.Header.Get("X-Prometheus-Remote-Write-Written-Samples")) - expectHeaderValue(t, 4, resp.Header.Get("X-Prometheus-Remote-Write-Written-Histograms")) + expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) + expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader)) if tc.appendExemplarErr != nil { - expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars")) + expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) } else { - expectHeaderValue(t, 2, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars")) + expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenExemplarsHeader)) } // Double check what was actually appended.