[PRW 2.0] Added Sender and RW Handler support for Response Stats. (#14444)

* [PRW 2.0] Added Sender support for Response Stats.

Chained on top of https://github.com/prometheus/prometheus/pull/14427
Fixes https://github.com/prometheus/prometheus/issues/14359

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: bwplotka <bwplotka@gmail.com>

* move write stats to it's own file

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Clean up header usage

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* add missing license to new stats file

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Addressed all comments.

Signed-off-by: bwplotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2024-07-19 19:53:40 +02:00 committed by GitHub
parent ac85bd47e1
commit a60e5ce362
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 276 additions and 127 deletions

View file

@ -2,6 +2,9 @@
## unreleased
* [FEATURE] Remote-Write: Add sender and receiver support for [Remote Write 2.0-rc.2](https://prometheus.io/docs/specs/remote_write_spec_2_0/) specification #14395 #14427 #14444
* [ENHANCEMENT] Remote-Write: 1.x messages against Remote Write 2.x Receivers will have now correct values for `prometheus_storage_<samples|histograms|exemplar>_failed_total` in case of partial errors #14444
## 2.53.1 / 2024-07-10
Fix a bug which would drop samples in remote-write if the sending flow stalled

View file

@ -101,6 +101,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return successExitCode
}
// TODO(bwplotka): Add PRW 2.0 support.
func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool {
metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels)
if err != nil {
@ -116,7 +117,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s
// Encode the request body into snappy encoding.
compressed := snappy.Encode(nil, raw)
err = client.Store(context.Background(), compressed, 0)
_, err = client.Store(context.Background(), compressed, 0)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false

View file

@ -14,7 +14,6 @@
package remote
import (
"bufio"
"bytes"
"context"
"fmt"
@ -235,12 +234,12 @@ type RecoverableError struct {
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
// and encoded bytes from codec.go.
func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteResponseStats, error) {
httpReq, err := http.NewRequest(http.MethodPost, c.urlString, bytes.NewReader(req))
if err != nil {
// Errors from NewRequest are from unparsable URLs, so are not
// recoverable.
return err
return WriteResponseStats{}, err
}
httpReq.Header.Add("Content-Encoding", string(c.writeCompression))
@ -267,28 +266,34 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
if err != nil {
// Errors from Client.Do are from (for example) network errors, so are
// recoverable.
return RecoverableError{err, defaultBackoff}
return WriteResponseStats{}, RecoverableError{err, defaultBackoff}
}
defer func() {
io.Copy(io.Discard, httpResp.Body)
httpResp.Body.Close()
}()
// TODO(bwplotka): Pass logger and emit debug on error?
// Parsing error means there were some response header values we can't parse,
// we can continue handling.
rs, _ := ParseWriteResponseStats(httpResp)
//nolint:usestdlibvars
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
if httpResp.StatusCode/100 == 2 {
return rs, nil
}
// Handling errors e.g. read potential error in the body.
// TODO(bwplotka): Pass logger and emit debug on error?
body, _ := io.ReadAll(io.LimitReader(httpResp.Body, maxErrMsgLen))
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, body)
//nolint:usestdlibvars
if httpResp.StatusCode/100 == 5 ||
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
return rs, RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
}
return err
return rs, err
}
// retryAfterDuration returns the duration for the Retry-After header. In case of any errors, it

View file

@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
c, err := NewWriteClient(hash, conf)
require.NoError(t, err)
err = c.Store(context.Background(), []byte{}, 0)
_, err = c.Store(context.Background(), []byte{}, 0)
if test.err != nil {
require.EqualError(t, err, test.err.Error())
} else {
@ -133,7 +133,7 @@ func TestClientRetryAfter(t *testing.T) {
c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))
var recErr RecoverableError
err = c.Store(context.Background(), []byte{}, 0)
_, err = c.Store(context.Background(), []byte{}, 0)
require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
if tc.expectedRecoverable {
require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter)
@ -169,7 +169,7 @@ func TestRetryAfterDuration(t *testing.T) {
}
}
func TestClientHeaders(t *testing.T) {
func TestClientCustomHeaders(t *testing.T) {
headersToSend := map[string]string{"Foo": "Bar", "Baz": "qux"}
var called bool
@ -203,7 +203,7 @@ func TestClientHeaders(t *testing.T) {
c, err := NewWriteClient("c", conf)
require.NoError(t, err)
err = c.Store(context.Background(), []byte{}, 0)
_, err = c.Store(context.Background(), []byte{}, 0)
require.NoError(t, err)
require.True(t, called, "The remote server wasn't called")

View file

@ -391,7 +391,7 @@ func (m *queueManagerMetrics) unregister() {
// external timeseries database.
type WriteClient interface {
// Store stores the given samples in the remote storage.
Store(ctx context.Context, req []byte, retryAttempt int) error
Store(ctx context.Context, req []byte, retryAttempt int) (WriteResponseStats, error)
// Name uniquely identifies the remote storage.
Name() string
// Endpoint is the remote read or write endpoint for the storage client.
@ -597,14 +597,15 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
}
begin := time.Now()
err := t.storeClient.Store(ctx, req, try)
// Ignoring WriteResponseStats, because there is nothing for metadata, since it's
// embedded in v2 calls now, and we do v1 here.
_, err := t.storeClient.Store(ctx, req, try)
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {
span.RecordError(err)
return err
}
return nil
}
@ -1661,8 +1662,8 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, rs, time.Since(begin))
return err
}
@ -1670,17 +1671,29 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// See https://github.com/prometheus/prometheus/issues/14409
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
begin := time.Now()
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, rs, time.Since(begin))
return err
}
func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, rs WriteResponseStats, duration time.Duration) {
// Partial errors may happen -- account for that.
sampleDiff := sampleCount - rs.Samples
if sampleDiff > 0 {
s.qm.metrics.failedSamplesTotal.Add(float64(sampleDiff))
}
histogramDiff := histogramCount - rs.Histograms
if histogramDiff > 0 {
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramDiff))
}
exemplarDiff := exemplarCount - rs.Exemplars
if exemplarDiff > 0 {
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarDiff))
}
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "histogramCount", histogramCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount))
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "failedSampleCount", sampleDiff, "failedHistogramCount", histogramDiff, "failedExemplarCount", exemplarDiff, "err", err)
} else if sampleDiff+exemplarDiff+histogramDiff > 0 {
level.Error(s.qm.logger).Log("msg", "we got 2xx status code from the Receiver yet statistics indicate some dat was not written; investigation needed", "failedSampleCount", sampleDiff, "failedHistogramCount", histogramDiff, "failedExemplarCount", exemplarDiff)
}
// These counters are used to calculate the dynamic sharding, and as such
@ -1688,6 +1701,7 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl
s.qm.dataOut.incr(int64(sampleCount + exemplarCount + histogramCount + metadataCount))
s.qm.dataOutDuration.incr(int64(duration))
s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars/histograms also should be subtracted, as an error means
// they will not be retried.
s.qm.metrics.pendingSamples.Sub(float64(sampleCount))
@ -1699,19 +1713,29 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) (WriteResponseStats, error) {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
return WriteResponseStats{}, err
}
reqSize := len(req)
*buf = req
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
accumulatedStats := WriteResponseStats{}
var accumulatedStatsMu sync.Mutex
addStats := func(rs WriteResponseStats) {
accumulatedStatsMu.Lock()
accumulatedStats = accumulatedStats.Add(rs)
accumulatedStatsMu.Unlock()
}
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
@ -1759,15 +1783,19 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
err := s.qm.client().Store(ctx, *buf, try)
// Technically for v1, we will likely have empty response stats, but for
// newer Receivers this might be not, so used it in a best effort.
rs, err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
addStats(rs)
if err != nil {
span.RecordError(err)
return err
if err == nil {
return nil
}
return nil
span.RecordError(err)
return err
}
onRetry := func() {
@ -1780,29 +1808,48 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
return err
return accumulatedStats, err
}
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return err
if err == nil && !accumulatedStats.Confirmed {
// No 2.0 response headers, and we sent v1 message, so likely it's 1.0 Receiver.
// Assume success, don't rely on headers.
return WriteResponseStats{
Samples: sampleCount,
Histograms: histogramCount,
Exemplars: exemplarCount,
}, nil
}
return accumulatedStats, err
}
// sendV2Samples to the remote storage with backoff for recoverable errors.
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) (WriteResponseStats, error) {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
return WriteResponseStats{}, err
}
reqSize := len(req)
*buf = req
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
accumulatedStats := WriteResponseStats{}
var accumulatedStatsMu sync.Mutex
addStats := func(rs WriteResponseStats) {
accumulatedStatsMu.Lock()
accumulatedStats = accumulatedStats.Add(rs)
accumulatedStatsMu.Unlock()
}
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
@ -1850,15 +1897,28 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
err := s.qm.client().Store(ctx, *buf, try)
rs, err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
addStats(rs)
if err != nil {
span.RecordError(err)
return err
if err == nil {
// Check the case mentioned in PRW 2.0
// https://prometheus.io/docs/specs/remote_write_spec_2_0/#required-written-response-headers.
if sampleCount+histogramCount+exemplarCount > 0 && rs.NoDataWritten() {
err = fmt.Errorf("sent v2 request with %v samples, %v histograms and %v exemplars; got 2xx, but PRW 2.0 response header statistics indicate %v samples, %v histograms and %v exemplars were accepted;"+
" assumining failure e.g. the target only supports PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly",
sampleCount, histogramCount, exemplarCount,
rs.Samples, rs.Histograms, rs.Exemplars,
)
span.RecordError(err)
return err
}
return nil
}
return nil
span.RecordError(err)
return err
}
onRetry := func() {
@ -1871,13 +1931,12 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
return err
return accumulatedStats, err
}
s.qm.metrics.sentBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return err
return accumulatedStats, err
}
func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) {

View file

@ -118,10 +118,10 @@ func TestBasicContentNegotiation(t *testing.T) {
expectFail: true,
},
{
name: "v2 talks to v1 that tries to unmarshal v2 payload with v1 proto",
name: "v2 talks to (broken) v1 that tries to unmarshal v2 payload with v1 proto",
senderProtoMsg: config.RemoteWriteProtoMsgV2, receiverProtoMsg: config.RemoteWriteProtoMsgV1,
injectErrs: []error{nil},
expectFail: true, // invalid request, no timeseries
expectFail: true, // We detect this thanks to https://github.com/prometheus/prometheus/issues/14359
},
// Opposite, v1 talking to v2 only server.
{
@ -130,12 +130,6 @@ func TestBasicContentNegotiation(t *testing.T) {
injectErrs: []error{errors.New("pretend unrecoverable err")},
expectFail: true,
},
{
name: "v1 talks to (broken) v2 that tries to unmarshal v1 payload with v2 proto",
senderProtoMsg: config.RemoteWriteProtoMsgV1, receiverProtoMsg: config.RemoteWriteProtoMsgV2,
injectErrs: []error{nil},
expectFail: true, // invalid request, no timeseries
},
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
@ -182,7 +176,6 @@ func TestBasicContentNegotiation(t *testing.T) {
if !tc.expectFail {
// No error expected, so wait for data.
c.waitForExpectedData(t, 5*time.Second)
require.Equal(t, 1, c.writesReceived)
require.Equal(t, 0.0, client_testutil.ToFloat64(qm.metrics.failedSamplesTotal))
} else {
// Wait for failure to be recorded in metrics.
@ -190,11 +183,10 @@ func TestBasicContentNegotiation(t *testing.T) {
defer cancel()
require.NoError(t, runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
if client_testutil.ToFloat64(qm.metrics.failedSamplesTotal) != 1.0 {
return errors.New("expected one sample failed in qm metrics")
return fmt.Errorf("expected one sample failed in qm metrics; got %v", client_testutil.ToFloat64(qm.metrics.failedSamplesTotal))
}
return nil
}))
require.Equal(t, 0, c.writesReceived)
}
// samplesTotal means attempts.
@ -764,10 +756,10 @@ func TestDisableReshardOnRetry(t *testing.T) {
metrics = newQueueManagerMetrics(nil, "", "")
client = &MockWriteClient{
StoreFunc: func(ctx context.Context, b []byte, i int) error {
StoreFunc: func(ctx context.Context, b []byte, i int) (WriteResponseStats, error) {
onStoreCalled()
return RecoverableError{
return WriteResponseStats{}, RecoverableError{
error: fmt.Errorf("fake error"),
retryAfter: model.Duration(retryAfter),
}
@ -1113,14 +1105,14 @@ func (c *TestWriteClient) SetReturnError(err error) {
c.returnError = err
}
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResponseStats, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.storeWait > 0 {
time.Sleep(c.storeWait)
}
if c.returnError != nil {
return c.returnError
return WriteResponseStats{}, c.returnError
}
// nil buffers are ok for snappy, ignore cast error.
if c.buf != nil {
@ -1130,14 +1122,14 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
reqBuf, err := snappy.Decode(c.buf, req)
c.buf = reqBuf
if err != nil {
return err
return WriteResponseStats{}, err
}
// Check if we've been told to inject err for this call.
if len(c.injectedErrs) > 0 {
c.currErr++
if err = c.injectedErrs[c.currErr]; err != nil {
return err
return WriteResponseStats{}, err
}
}
@ -1156,13 +1148,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
}
}
if err != nil {
return err
}
if len(reqProto.Timeseries) == 0 && len(reqProto.Metadata) == 0 {
return errors.New("invalid request, no timeseries")
return WriteResponseStats{}, err
}
rs := WriteResponseStats{}
b := labels.NewScratchBuilder(0)
for _, ts := range reqProto.Timeseries {
labels := ts.ToLabels(&b, nil)
@ -1170,10 +1159,12 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
if len(ts.Samples) > 0 {
c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...)
}
rs.Samples += len(ts.Samples)
if len(ts.Exemplars) > 0 {
c.receivedExemplars[tsID] = append(c.receivedExemplars[tsID], ts.Exemplars...)
}
rs.Exemplars += len(ts.Exemplars)
for _, h := range ts.Histograms {
if h.IsFloatHistogram() {
@ -1182,13 +1173,14 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
c.receivedHistograms[tsID] = append(c.receivedHistograms[tsID], h)
}
}
rs.Histograms += len(ts.Histograms)
}
for _, m := range reqProto.Metadata {
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
}
c.writesReceived++
return nil
return rs, nil
}
func (c *TestWriteClient) Name() string {
@ -1256,10 +1248,10 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient {
return &TestBlockingWriteClient{}
}
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error {
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) (WriteResponseStats, error) {
c.numCalls.Inc()
<-ctx.Done()
return nil
return WriteResponseStats{}, nil
}
func (c *TestBlockingWriteClient) NumCalls() uint64 {
@ -1278,19 +1270,19 @@ func (c *TestBlockingWriteClient) Endpoint() string {
type NopWriteClient struct{}
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
func (c *NopWriteClient) Store(context.Context, []byte, int) error {
return nil
func (c *NopWriteClient) Store(context.Context, []byte, int) (WriteResponseStats, error) {
return WriteResponseStats{}, nil
}
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
type MockWriteClient struct {
StoreFunc func(context.Context, []byte, int) error
StoreFunc func(context.Context, []byte, int) (WriteResponseStats, error)
NameFunc func() string
EndpointFunc func() string
}
func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) error {
func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) (WriteResponseStats, error) {
return c.StoreFunc(ctx, bb, n)
}
func (c *MockWriteClient) Name() string { return c.NameFunc() }

107
storage/remote/stats.go Normal file
View file

@ -0,0 +1,107 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"errors"
"net/http"
"strconv"
)
const (
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
)
// WriteResponseStats represents the response write statistics specified in https://github.com/prometheus/docs/pull/2486
type WriteResponseStats struct {
// Samples represents X-Prometheus-Remote-Write-Written-Samples
Samples int
// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
Histograms int
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
Exemplars int
// Confirmed means we can trust those statistics from the point of view
// of the PRW 2.0 spec. When parsed from headers, it means we got at least one
// response header from the Receiver to confirm those numbers, meaning it must
// be a at least 2.0 Receiver. See ParseWriteResponseStats for details.
Confirmed bool
}
// NoDataWritten returns true if statistics indicate no data was written.
func (s WriteResponseStats) NoDataWritten() bool {
return (s.Samples + s.Histograms + s.Exemplars) == 0
}
// AllSamples returns both float and histogram sample numbers.
func (s WriteResponseStats) AllSamples() int {
return s.Samples + s.Histograms
}
// Add returns the sum of this WriteResponseStats plus the given WriteResponseStats.
func (s WriteResponseStats) Add(rs WriteResponseStats) WriteResponseStats {
s.Confirmed = rs.Confirmed
s.Samples += rs.Samples
s.Histograms += rs.Histograms
s.Exemplars += rs.Exemplars
return s
}
// SetHeaders sets response headers in a given response writer.
// Make sure to use it before http.ResponseWriter.WriteHeader and .Write.
func (s WriteResponseStats) SetHeaders(w http.ResponseWriter) {
h := w.Header()
h.Set(rw20WrittenSamplesHeader, strconv.Itoa(s.Samples))
h.Set(rw20WrittenHistogramsHeader, strconv.Itoa(s.Histograms))
h.Set(rw20WrittenExemplarsHeader, strconv.Itoa(s.Exemplars))
}
// ParseWriteResponseStats returns WriteResponseStats parsed from the response headers.
//
// As per 2.0 spec, missing header means 0. However, abrupt HTTP errors, 1.0 Receivers
// or buggy 2.0 Receivers might result in no response headers specified and that
// might NOT necessarily mean nothing was written. To represent that we set
// s.Confirmed = true only when see at least on response header.
//
// Error is returned when any of the header fails to parse as int64.
func ParseWriteResponseStats(r *http.Response) (s WriteResponseStats, err error) {
var (
errs []error
h = r.Header
)
if v := h.Get(rw20WrittenSamplesHeader); v != "" { // Empty means zero.
s.Confirmed = true
if s.Samples, err = strconv.Atoi(v); err != nil {
s.Samples = 0
errs = append(errs, err)
}
}
if v := h.Get(rw20WrittenHistogramsHeader); v != "" { // Empty means zero.
s.Confirmed = true
if s.Histograms, err = strconv.Atoi(v); err != nil {
s.Histograms = 0
errs = append(errs, err)
}
}
if v := h.Get(rw20WrittenExemplarsHeader); v != "" { // Empty means zero.
s.Confirmed = true
if s.Exemplars, err = strconv.Atoi(v); err != nil {
s.Exemplars = 0
errs = append(errs, err)
}
}
return s, errors.Join(errs...)
}

View file

@ -19,7 +19,6 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
@ -201,7 +200,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
respStats, errHTTPCode, err := h.writeV2(r.Context(), &req)
// Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases.
respStats.SetResponseHeaders(w.Header())
respStats.SetHeaders(w)
if err != nil {
if errHTTPCode/5 == 100 { // 5xx
@ -318,24 +317,6 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist
return nil
}
const (
prw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Written-Samples"
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Written-Histograms"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Written-Exemplars"
)
type responseStats struct {
samples int
histograms int
exemplars int
}
func (s responseStats) SetResponseHeaders(h http.Header) {
h.Set(prw20WrittenSamplesHeader, strconv.Itoa(s.samples))
h.Set(rw20WrittenHistogramsHeader, strconv.Itoa(s.histograms))
h.Set(rw20WrittenExemplarsHeader, strconv.Itoa(s.exemplars))
}
// writeV2 is similar to write, but it works with v2 proto message,
// allows partial 4xx writes and gathers statistics.
//
@ -345,14 +326,14 @@ func (s responseStats) SetResponseHeaders(h http.Header) {
//
// NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors.
// Once we have 5xx type of error, we immediately stop and rollback all appends.
func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ responseStats, errHTTPCode int, _ error) {
func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) {
app := &timeLimitAppender{
Appender: h.appendable.Appender(ctx),
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
rs := responseStats{}
samplesWithoutMetadata, errHTTPCode, err := h.appendV2(app, req, &rs)
s := WriteResponseStats{}
samplesWithoutMetadata, errHTTPCode, err := h.appendV2(app, req, &s)
if err != nil {
if errHTTPCode/5 == 100 {
// On 5xx, we always rollback, because we expect
@ -360,29 +341,29 @@ func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ res
if rerr := app.Rollback(); rerr != nil {
level.Error(h.logger).Log("msg", "writev2 rollback failed on retry-able error", "err", rerr)
}
return responseStats{}, errHTTPCode, err
return WriteResponseStats{}, errHTTPCode, err
}
// Non-retriable (e.g. bad request error case). Can be partially written.
commitErr := app.Commit()
if commitErr != nil {
// Bad requests does not matter as we have internal error (retryable).
return responseStats{}, http.StatusInternalServerError, commitErr
return WriteResponseStats{}, http.StatusInternalServerError, commitErr
}
// Bad request error happened, but rest of data (if any) was written.
h.samplesAppendedWithoutMetadata.Add(float64(samplesWithoutMetadata))
return rs, errHTTPCode, err
return s, errHTTPCode, err
}
// All good just commit.
if err := app.Commit(); err != nil {
return responseStats{}, http.StatusInternalServerError, err
return WriteResponseStats{}, http.StatusInternalServerError, err
}
h.samplesAppendedWithoutMetadata.Add(float64(samplesWithoutMetadata))
return rs, 0, nil
return s, 0, nil
}
func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *responseStats) (samplesWithoutMetadata, errHTTPCode int, err error) {
func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *WriteResponseStats) (samplesWithoutMetadata, errHTTPCode int, err error) {
var (
badRequestErrs []error
outOfOrderExemplarErrs, samplesWithInvalidLabels int
@ -400,14 +381,14 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
continue
}
allSamplesSoFar := rs.samples + rs.histograms
allSamplesSoFar := rs.AllSamples()
var ref storage.SeriesRef
// Samples.
for _, s := range ts.Samples {
ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue())
if err == nil {
rs.samples++
rs.Samples++
continue
}
// Handle append error.
@ -431,7 +412,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, hp.ToIntHistogram(), nil)
}
if err == nil {
rs.histograms++
rs.Histograms++
continue
}
// Handle append error.
@ -453,18 +434,19 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
e := ep.ToExemplar(&b, req.Symbols)
ref, err = app.AppendExemplar(ref, ls, e)
if err == nil {
rs.exemplars++
rs.Exemplars++
continue
}
// Handle append error.
// TODO(bwplotka): I left the logic as in v1, but we might want to make it consistent with samples and histograms.
// Since exemplar storage is still experimental, we don't fail in anyway, the request on ingestion errors.
if errors.Is(err, storage.ErrOutOfOrderExemplar) {
outOfOrderExemplarErrs++
level.Debug(h.logger).Log("msg", "Out of order exemplar", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e))
outOfOrderExemplarErrs++ // Maintain old metrics, but technically not needed, given we fail here.
level.Error(h.logger).Log("msg", "Out of order exemplar", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e))
badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String()))
continue
}
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AppendExemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err)
// TODO(bwplotka): Add strict mode which would trigger rollback of everything if needed.
// For now we keep the previously released flow (just error not debug leve) of dropping them without rollback and 5xx.
level.Error(h.logger).Log("msg", "failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e))
}
m := ts.ToMetadata(req.Symbols)
@ -472,7 +454,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
// Metadata is attached to each series, so since Prometheus does not reject sample without metadata information,
// we don't report remote write error either. We increment metric instead.
samplesWithoutMetadata += (rs.samples + rs.histograms) - allSamplesSoFar
samplesWithoutMetadata += rs.AllSamples() - allSamplesSoFar
}
}

View file

@ -398,7 +398,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
{
desc: "Partial write; skipped exemplar; exemplar storage errs are noop",
input: writeV2RequestFixture.Timeseries,
appendExemplarErr: errors.New("some exemplar append error"),
appendExemplarErr: errors.New("some exemplar internal append error"),
expectedCode: http.StatusNoContent,
},
@ -449,9 +449,9 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
if tc.expectedCode == http.StatusInternalServerError {
// We don't expect writes for partial writes with retry-able code.
expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Samples"))
expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Histograms"))
expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars"))
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenSamplesHeader))
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenHistogramsHeader))
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader))
require.Empty(t, len(appendable.samples))
require.Empty(t, len(appendable.histograms))
@ -462,12 +462,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
// Double check mandatory 2.0 stats.
// writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each.
expectHeaderValue(t, 2, resp.Header.Get("X-Prometheus-Remote-Write-Written-Samples"))
expectHeaderValue(t, 4, resp.Header.Get("X-Prometheus-Remote-Write-Written-Histograms"))
expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader))
expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader))
if tc.appendExemplarErr != nil {
expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars"))
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader))
} else {
expectHeaderValue(t, 2, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars"))
expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenExemplarsHeader))
}
// Double check what was actually appended.