From 337f9ae7aeeef2761365244ff6e2fe02aa86ce62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Pazos?= Date: Tue, 26 Sep 2023 13:30:26 -0300 Subject: [PATCH] add new proto support on receiver end --- cmd/prometheus/main.go | 3 ++ storage/remote/write_handler.go | 23 ++++++++-- storage/remote/write_handler_test.go | 63 ++++++++++++++++++++++++---- web/api/v1/api.go | 3 +- web/web.go | 42 ++++++++++--------- 5 files changed, 102 insertions(+), 32 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a5cca8fa45..35affbb47a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -214,6 +214,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "reduced-rw-proto": c.rwProto = true level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.") + case "reduced-rw-proto-receiver": + c.web.EnableReducedWriteProtoReceiver = true + level.Info(logger).Log("msg", "Reduced proto format will be expected by the remote write receiver, client must send this new protobuf format.") default: level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o) } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 6c0cd8a29b..51a5418169 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -35,14 +35,18 @@ type writeHandler struct { appendable storage.Appendable samplesWithInvalidLabelsTotal prometheus.Counter + + // experimental feature, new remote write proto format + internFormat bool } // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. -func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler { +func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, internFormat bool) http.Handler { h := &writeHandler{ - logger: logger, - appendable: appendable, + logger: logger, + appendable: appendable, + internFormat: internFormat, samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "prometheus", @@ -58,7 +62,18 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st } func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - req, err := DecodeWriteRequest(r.Body) + var err error + var req *prompb.WriteRequest + if h.internFormat { + var redReq *prompb.WriteRequestWithRefs + redReq, err = DecodeReducedWriteRequest(r.Body) + if err == nil { + req, err = ReducedWriteRequestToWriteRequest(redReq) + } + } else { + req, err = DecodeWriteRequest(r.Body) + } + if err != nil { level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 27d0e9fabd..20059474e9 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -45,7 +45,7 @@ func TestRemoteWriteHandler(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{} - handler := NewWriteHandler(nil, nil, appendable) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -96,7 +96,7 @@ func TestOutOfOrderSample(t *testing.T) { appendable := &mockAppendable{ latestSample: 100, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -121,7 +121,7 @@ func TestOutOfOrderExemplar(t *testing.T) { appendable := &mockAppendable{ latestExemplar: 100, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -144,7 +144,8 @@ func TestOutOfOrderHistogram(t *testing.T) { appendable := &mockAppendable{ latestHistogram: 100, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -172,7 +173,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { } appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) recorder := httptest.NewRecorder() b.ResetTimer() @@ -191,7 +192,7 @@ func TestCommitErr(t *testing.T) { appendable := &mockAppendable{ commitErr: fmt.Errorf("commit error"), } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -217,7 +218,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) - handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head()) + handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), false) buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) require.NoError(b, err) @@ -262,6 +263,54 @@ func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries { return series } +func TestRemoteWriteHandlerReducedProtocol(t *testing.T) { + buf, _, err := buildReducedWriteRequest(writeRequestWithRefsFixture.Timeseries, writeRequestWithRefsFixture.StringSymbolTable, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(nil, nil, appendable, true) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusNoContent, resp.StatusCode) + + i := 0 + j := 0 + k := 0 + // the reduced write request is equivalent to the write request fixture. + // we can use it for + for _, ts := range writeRequestFixture.Timeseries { + labels := labelProtosToLabels(ts.Labels) + for _, s := range ts.Samples { + require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) + i++ + } + + for _, e := range ts.Exemplars { + exemplarLabels := labelProtosToLabels(e.Labels) + require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) + j++ + } + + for _, hp := range ts.Histograms { + if hp.IsFloatHistogram() { + fh := FloatHistogramProtoToFloatHistogram(hp) + require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) + } else { + h := HistogramProtoToHistogram(hp) + require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) + } + + k++ + } + } +} + type mockAppendable struct { latestSample int64 samples []mockSample diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 79569a657d..b37b93f06b 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -254,6 +254,7 @@ func NewAPI( statsRenderer StatsRenderer, rwEnabled bool, otlpEnabled bool, + remoteWriteReducedProto bool, ) *API { a := &API{ QueryEngine: qe, @@ -295,7 +296,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, remoteWriteReducedProto) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) diff --git a/web/web.go b/web/web.go index 7022db64ee..09164d955a 100644 --- a/web/web.go +++ b/web/web.go @@ -242,26 +242,27 @@ type Options struct { Version *PrometheusVersion Flags map[string]string - ListenAddress string - CORSOrigin *regexp.Regexp - ReadTimeout time.Duration - MaxConnections int - ExternalURL *url.URL - RoutePrefix string - UseLocalAssets bool - UserAssetsPath string - ConsoleTemplatesPath string - ConsoleLibrariesPath string - EnableLifecycle bool - EnableAdminAPI bool - PageTitle string - RemoteReadSampleLimit int - RemoteReadConcurrencyLimit int - RemoteReadBytesInFrame int - EnableRemoteWriteReceiver bool - EnableOTLPWriteReceiver bool - IsAgent bool - AppName string + ListenAddress string + CORSOrigin *regexp.Regexp + ReadTimeout time.Duration + MaxConnections int + ExternalURL *url.URL + RoutePrefix string + UseLocalAssets bool + UserAssetsPath string + ConsoleTemplatesPath string + ConsoleLibrariesPath string + EnableLifecycle bool + EnableAdminAPI bool + PageTitle string + RemoteReadSampleLimit int + RemoteReadConcurrencyLimit int + RemoteReadBytesInFrame int + EnableRemoteWriteReceiver bool + EnableOTLPWriteReceiver bool + IsAgent bool + AppName string + EnableReducedWriteProtoReceiver bool Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -352,6 +353,7 @@ func New(logger log.Logger, o *Options) *Handler { nil, o.EnableRemoteWriteReceiver, o.EnableOTLPWriteReceiver, + o.EnableReducedWriteProtoReceiver, ) if o.RoutePrefix != "/" {