add new proto support on receiver end

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-09-26 13:30:26 -03:00
parent 005ba7ac97
commit ab7c96a30e
5 changed files with 102 additions and 32 deletions

View file

@ -223,6 +223,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "reduced-rw-proto": case "reduced-rw-proto":
c.rwProto = true c.rwProto = true
level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.") level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.")
case "reduced-rw-proto-receiver":
c.web.EnableReducedWriteProtoReceiver = true
level.Info(logger).Log("msg", "Reduced proto format will be expected by the remote write receiver, client must send this new protobuf format.")
default: default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o) level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
} }

View file

@ -35,14 +35,18 @@ type writeHandler struct {
appendable storage.Appendable appendable storage.Appendable
samplesWithInvalidLabelsTotal prometheus.Counter samplesWithInvalidLabelsTotal prometheus.Counter
// experimental feature, new remote write proto format
internFormat bool
} }
// NewWriteHandler creates a http.Handler that accepts remote write requests and // NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable. // writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler { func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, internFormat bool) http.Handler {
h := &writeHandler{ h := &writeHandler{
logger: logger, logger: logger,
appendable: appendable, appendable: appendable,
internFormat: internFormat,
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus", Namespace: "prometheus",
@ -58,7 +62,18 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
} }
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := DecodeWriteRequest(r.Body) var err error
var req *prompb.WriteRequest
if h.internFormat {
var redReq *prompb.WriteRequestWithRefs
redReq, err = DecodeReducedWriteRequest(r.Body)
if err == nil {
req, err = ReducedWriteRequestToWriteRequest(redReq)
}
} else {
req, err = DecodeWriteRequest(r.Body)
}
if err != nil { if err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)

View file

@ -45,7 +45,7 @@ func TestRemoteWriteHandler(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(nil, nil, appendable) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -96,7 +96,7 @@ func TestOutOfOrderSample(t *testing.T) {
appendable := &mockAppendable{ appendable := &mockAppendable{
latestSample: 100, latestSample: 100,
} }
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -121,7 +121,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
appendable := &mockAppendable{ appendable := &mockAppendable{
latestExemplar: 100, latestExemplar: 100,
} }
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -144,7 +144,8 @@ func TestOutOfOrderHistogram(t *testing.T) {
appendable := &mockAppendable{ appendable := &mockAppendable{
latestHistogram: 100, latestHistogram: 100,
} }
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -172,7 +173,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
} }
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
b.ResetTimer() b.ResetTimer()
@ -191,7 +192,7 @@ func TestCommitErr(t *testing.T) {
appendable := &mockAppendable{ appendable := &mockAppendable{
commitErr: fmt.Errorf("commit error"), commitErr: fmt.Errorf("commit error"),
} }
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -217,7 +218,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close()) require.NoError(b, db.Close())
}) })
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head()) handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), false)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err) require.NoError(b, err)
@ -262,6 +263,54 @@ func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries {
return series return series
} }
func TestRemoteWriteHandlerReducedProtocol(t *testing.T) {
buf, _, err := buildReducedWriteRequest(writeRequestWithRefsFixture.Timeseries, writeRequestWithRefsFixture.StringSymbolTable, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(nil, nil, appendable, true)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
i := 0
j := 0
k := 0
// the reduced write request is equivalent to the write request fixture.
// we can use it for
for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(ts.Labels)
for _, s := range ts.Samples {
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
}
}
type mockAppendable struct { type mockAppendable struct {
latestSample int64 latestSample int64
samples []mockSample samples []mockSample

View file

@ -254,6 +254,7 @@ func NewAPI(
statsRenderer StatsRenderer, statsRenderer StatsRenderer,
rwEnabled bool, rwEnabled bool,
otlpEnabled bool, otlpEnabled bool,
remoteWriteReducedProto bool,
) *API { ) *API {
a := &API{ a := &API{
QueryEngine: qe, QueryEngine: qe,
@ -295,7 +296,7 @@ func NewAPI(
} }
if rwEnabled { if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap) a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, remoteWriteReducedProto)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)

View file

@ -241,26 +241,27 @@ type Options struct {
Version *PrometheusVersion Version *PrometheusVersion
Flags map[string]string Flags map[string]string
ListenAddress string ListenAddress string
CORSOrigin *regexp.Regexp CORSOrigin *regexp.Regexp
ReadTimeout time.Duration ReadTimeout time.Duration
MaxConnections int MaxConnections int
ExternalURL *url.URL ExternalURL *url.URL
RoutePrefix string RoutePrefix string
UseLocalAssets bool UseLocalAssets bool
UserAssetsPath string UserAssetsPath string
ConsoleTemplatesPath string ConsoleTemplatesPath string
ConsoleLibrariesPath string ConsoleLibrariesPath string
EnableLifecycle bool EnableLifecycle bool
EnableAdminAPI bool EnableAdminAPI bool
PageTitle string PageTitle string
RemoteReadSampleLimit int RemoteReadSampleLimit int
RemoteReadConcurrencyLimit int RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int RemoteReadBytesInFrame int
EnableRemoteWriteReceiver bool EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool EnableOTLPWriteReceiver bool
IsAgent bool IsAgent bool
AppName string AppName string
EnableReducedWriteProtoReceiver bool
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
Registerer prometheus.Registerer Registerer prometheus.Registerer
@ -351,6 +352,7 @@ func New(logger log.Logger, o *Options) *Handler {
nil, nil,
o.EnableRemoteWriteReceiver, o.EnableRemoteWriteReceiver,
o.EnableOTLPWriteReceiver, o.EnableOTLPWriteReceiver,
o.EnableReducedWriteProtoReceiver,
) )
if o.RoutePrefix != "/" { if o.RoutePrefix != "/" {