add flag to override remote write header value

Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>
This commit is contained in:
Alex Greenbank 2024-04-11 14:38:34 +00:00
parent ad77987bdc
commit 5f90eb6abe
8 changed files with 149 additions and 41 deletions

View file

@ -158,7 +158,9 @@ type flagConfig struct {
enablePerStepStats bool
enableAutoGOMAXPROCS bool
// todo: how to use the enable feature flag properly + use the remote format enum type
rwFormat int
rwFormat int
// Allow the header value sent to clients to be overridden during rollout/rollback
rwFormatHeaderAdvertise string
enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool
@ -457,6 +459,9 @@ func main() {
a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)").
Default("0").IntVar(&cfg.rwFormat)
a.Flag("remote-write-header-advertise-overide", "remote write header to send back for rollout/rollback phases").
Default("").StringVar(&cfg.rwFormatHeaderAdvertise)
promlogflag.AddFlags(a, &cfg.promlogConfig)
a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {

View file

@ -56,6 +56,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, metadata-wal-records. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--remote-write-format</code> | remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format) | `0` |
| <code class="text-nowrap">--remote-write-header-advertise-overide</code> | remote write header to send back for rollout/rollback phases | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -44,10 +44,16 @@ const (
RemoteWriteVersion20HeaderValue = "2.0"
)
func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
func rwHeaderNameValues(rwFormat config.RemoteWriteFormat, rwFormatHeaderAdvertise string) map[string]string {
// Return the correct remote write header name/values based on provided rwFormat.
ret := make(map[string]string, 1)
if rwFormatHeaderAdvertise != "" {
// If we have an override set then we return that
ret[RemoteWriteVersionHeader] = rwFormatHeaderAdvertise
return ret
}
switch rwFormat {
case Version1:
ret[RemoteWriteVersionHeader] = RemoteWriteVersion1HeaderValue
@ -70,13 +76,15 @@ type writeHeadHandler struct {
// Experimental feature, new remote write proto format.
// The handler will accept the new format, but it can still accept the old one.
rwFormat config.RemoteWriteFormat
rwFormat config.RemoteWriteFormat
rwFormatHeaderAdvertise string
}
func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat) http.Handler {
func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat, rwFormatHeaderAdvertise string) http.Handler {
h := &writeHeadHandler{
logger: logger,
rwFormat: rwFormat,
logger: logger,
rwFormat: rwFormat,
rwFormatHeaderAdvertise: rwFormatHeaderAdvertise,
remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
@ -94,7 +102,7 @@ func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Send a response to the HEAD request based on the format supported.
// Add appropriate header values for the specific rwFormat.
for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
for hName, hValue := range rwHeaderNameValues(h.rwFormat, h.rwFormatHeaderAdvertise) {
w.Header().Set(hName, hValue)
}
@ -109,16 +117,18 @@ type writeHandler struct {
// Experimental feature, new remote write proto format.
// The handler will accept the new format, but it can still accept the old one.
rwFormat config.RemoteWriteFormat
rwFormat config.RemoteWriteFormat
rwFormatHeaderAdvertise string
}
// NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat config.RemoteWriteFormat) http.Handler {
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat config.RemoteWriteFormat, rwFormatHeaderAdvertise string) http.Handler {
h := &writeHandler{
logger: logger,
appendable: appendable,
rwFormat: rwFormat,
logger: logger,
appendable: appendable,
rwFormat: rwFormat,
rwFormatHeaderAdvertise: rwFormatHeaderAdvertise,
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
@ -136,7 +146,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error
// Set the header(s) in the response based on the rwFormat the server supports.
for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
for hName, hValue := range rwHeaderNameValues(h.rwFormat, h.rwFormatHeaderAdvertise) {
w.Header().Set(hName, hValue)
}

View file

@ -40,7 +40,8 @@ import (
)
func TestRemoteWriteHeadHandler(t *testing.T) {
handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2)
// HEAD request to a Version2 server with no override = 200 and "2.0/snappy"
handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2, "")
req, err := http.NewRequest(http.MethodHead, "", nil)
require.NoError(t, err)
@ -56,6 +57,24 @@ func TestRemoteWriteHeadHandler(t *testing.T) {
require.Equal(t, "2.0;snappy,0.1.0", protHeader)
}
func TestRemoteWriteHeadHandlerOverride(t *testing.T) {
// HEAD request to a Version2 server with an override of "0.1.0" = 200 and "0.1.0"
handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2, "0.1.0")
req, err := http.NewRequest(http.MethodHead, "", nil)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusOK, resp.StatusCode)
// Check header is expected value.
protHeader := resp.Header.Get(RemoteWriteVersionHeader)
require.Equal(t, "0.1.0", protHeader)
}
func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
// Send a v2 request without a "Content-Encoding:" header -> 406.
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
@ -68,7 +87,7 @@ func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -89,7 +108,7 @@ func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -109,7 +128,7 @@ func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -127,7 +146,7 @@ func TestRemoteWriteHandler(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -169,6 +188,65 @@ func TestRemoteWriteHandler(t *testing.T) {
}
}
func TestRemoteWriteHandlerMinimizedFormatOverride(t *testing.T) {
// As TestRemoteWriteHandlerMinimizedFormat but we send a v2 request to a Version2 server that is advertising it only
// does "0.1.0" (but will accept "2.0/snappy" obviously).
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
// Must provide "Content-Encoding: snappy" header.
req.Header.Set("Content-Encoding", "snappy")
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2, "0.1.0")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
// Check header is expected value.
protHeader := resp.Header.Get(RemoteWriteVersionHeader)
require.Equal(t, "0.1.0", protHeader)
i := 0
j := 0
k := 0
// the reduced write request is equivalent to the write request fixture.
// we can use it for
for _, ts := range writeRequestMinimizedFixture.Timeseries {
ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols)
for _, s := range ts.Samples {
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosV2ToLabels(e.LabelsRefs, writeRequestMinimizedFixture.Symbols)
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoV2ToFloatHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoV2ToHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
// todo: check for metadata
}
}
func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
require.NoError(t, err)
@ -180,7 +258,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -240,7 +318,7 @@ func TestOutOfOrderSample(t *testing.T) {
latestSample: 100,
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -266,7 +344,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
latestExemplar: 100,
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -290,7 +368,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
latestHistogram: 100,
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -319,7 +397,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
appendable := &mockAppendable{}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1, "")
recorder := httptest.NewRecorder()
b.ResetTimer()
@ -339,7 +417,7 @@ func TestCommitErr(t *testing.T) {
commitErr: fmt.Errorf("commit error"),
}
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1, "")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -366,7 +444,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close())
})
// TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Version1)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Version1, "")
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
require.NoError(b, err)

View file

@ -255,6 +255,7 @@ func NewAPI(
statsRenderer StatsRenderer,
rwEnabled bool,
rwFormat config.RemoteWriteFormat,
rwFormatHeaderAdvertise string,
otlpEnabled bool,
) *API {
a := &API{
@ -309,8 +310,8 @@ func NewAPI(
// 1. (During) support new protocols but don't advertise
// <wait a suitable period for all sending clients to be aware that receiving servers no longer support 2.0>
// 2. (After) no flags set
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat)
a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, rwFormat)
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat, rwFormatHeaderAdvertise)
a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, rwFormat, rwFormatHeaderAdvertise)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)

View file

@ -339,27 +339,37 @@ var sampleFlagMap = map[string]string{
func TestHeadEndpoint(t *testing.T) {
for _, tc := range []struct {
name string
rwFormat config.RemoteWriteFormat
expectedStatusCode int
expectedHeaderValue string
name string
rwFormat config.RemoteWriteFormat
rwFormatHeaderAdvertise string
expectedStatusCode int
expectedHeaderValue string
}{
{
name: "HEAD Version 1",
rwFormat: remote.Version1,
expectedStatusCode: http.StatusOK,
expectedHeaderValue: "0.1.0",
name: "HEAD Version 1",
rwFormat: remote.Version1,
rwFormatHeaderAdvertise: "",
expectedStatusCode: http.StatusOK,
expectedHeaderValue: "0.1.0",
},
{
name: "HEAD Version 2",
rwFormat: remote.Version2,
expectedStatusCode: http.StatusOK,
expectedHeaderValue: "2.0;snappy,0.1.0",
name: "HEAD Version 2",
rwFormat: remote.Version2,
rwFormatHeaderAdvertise: "",
expectedStatusCode: http.StatusOK,
expectedHeaderValue: "2.0;snappy,0.1.0",
},
{
name: "HEAD Version 2",
rwFormat: remote.Version2,
rwFormatHeaderAdvertise: "0.1.0",
expectedStatusCode: http.StatusOK,
expectedHeaderValue: "0.1.0",
},
} {
r := route.New()
api := &API{
remoteWriteHeadHandler: remote.NewWriteHeadHandler(log.NewNopLogger(), nil, tc.rwFormat),
remoteWriteHeadHandler: remote.NewWriteHeadHandler(log.NewNopLogger(), nil, tc.rwFormat, tc.rwFormatHeaderAdvertise),
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
}
api.Register(r)

View file

@ -137,6 +137,7 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
nil,
false,
remote.Version1,
"",
false, // Disable experimental reduce remote write proto support.
)

View file

@ -262,7 +262,8 @@ type Options struct {
IsAgent bool
AppName string
// TODO(cstyan): should this change to a list of tuples, maybe via the content negotiation PR?
RemoteWriteFormat config.RemoteWriteFormat
RemoteWriteFormat config.RemoteWriteFormat
RemoteWriteFormatHeaderAdvertise string
Gatherer prometheus.Gatherer
Registerer prometheus.Registerer
@ -353,6 +354,7 @@ func New(logger log.Logger, o *Options) *Handler {
nil,
o.EnableRemoteWriteReceiver,
o.RemoteWriteFormat,
o.RemoteWriteFormatHeaderAdvertise,
o.EnableOTLPWriteReceiver,
)