[PRW-2.0] (chain1) New Remote Write 2.0 Config options for 2.0-rc.1 spec. (#14335)

NOTE: For simple review this change does not touch remote/ packages, only main and configs.

Spec: https://prometheus.io/docs/specs/remote_write_spec_2_0

Supersedes https://github.com/prometheus/prometheus/pull/13968

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2024-06-25 08:15:27 +02:00 committed by GitHub
parent 0c0883e3cb
commit 2b348d43b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 167 additions and 63 deletions

View file

@ -158,8 +158,6 @@ type flagConfig struct {
enableNewSDManager bool enableNewSDManager bool
enablePerStepStats bool enablePerStepStats bool
enableAutoGOMAXPROCS bool enableAutoGOMAXPROCS bool
// todo: how to use the enable feature flag properly + use the remote format enum type
rwFormat int
enableAutoGOMEMLIMIT bool enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool enableConcurrentRuleEval bool
@ -311,9 +309,15 @@ func main() {
a.Flag("web.enable-admin-api", "Enable API endpoints for admin control actions."). a.Flag("web.enable-admin-api", "Enable API endpoints for admin control actions.").
Default("false").BoolVar(&cfg.web.EnableAdminAPI) Default("false").BoolVar(&cfg.web.EnableAdminAPI)
// TODO(bwplotka): Moveo all remote receive flags to config.
// See https://github.com/prometheus/prometheus/pull/13968/files#r1577035002
a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests."). a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests.").
Default("false").BoolVar(&cfg.web.EnableRemoteWriteReceiver) Default("false").BoolVar(&cfg.web.EnableRemoteWriteReceiver)
supportedRemoteWriteProtoMsgs := config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2}
a.Flag("web.remote-write-receiver.accepted-protobuf-messages", fmt.Sprintf("List of the remote write protobuf messages to accept when receiving the remote writes. Supported values: %v", supportedRemoteWriteProtoMsgs.String())).
Default(supportedRemoteWriteProtoMsgs.Strings()...).SetValue(rwProtoMsgFlagValue(&cfg.web.AcceptRemoteWriteProtoMsgs))
a.Flag("web.console.templates", "Path to the console template directory, available at /consoles."). a.Flag("web.console.templates", "Path to the console template directory, available at /consoles.").
Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath) Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath)
@ -455,9 +459,6 @@ func main() {
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList) Default("").StringsVar(&cfg.featureList)
a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)").
Default("0").IntVar(&cfg.rwFormat)
promlogflag.AddFlags(a, &cfg.promlogConfig) promlogflag.AddFlags(a, &cfg.promlogConfig)
a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error { a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {
@ -820,7 +821,6 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String() cfg.web.Flags[f.Name] = f.Value.String()
} }
cfg.web.RemoteWriteFormat = config.RemoteWriteFormat(cfg.rwFormat)
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager. // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
@ -1737,3 +1737,33 @@ type discoveryManager interface {
Run() error Run() error
SyncCh() <-chan map[string][]*targetgroup.Group SyncCh() <-chan map[string][]*targetgroup.Group
} }
// TODO(bwplotka): Add unit test.
type rwProtoMsgFlagParser struct {
msgs *[]config.RemoteWriteProtoMsg
}
func rwProtoMsgFlagValue(msgs *[]config.RemoteWriteProtoMsg) kingpin.Value {
return &rwProtoMsgFlagParser{msgs: msgs}
}
func (p *rwProtoMsgFlagParser) IsCumulative() bool {
return true
}
func (p *rwProtoMsgFlagParser) String() string {
ss := make([]string, 0, len(*p.msgs))
for _, t := range *p.msgs {
ss = append(ss, string(t))
}
return strings.Join(ss, ",")
}
func (p *rwProtoMsgFlagParser) Set(opt string) error {
t := config.RemoteWriteProtoMsg(opt)
if err := t.Validate(); err != nil {
return err
}
*p.msgs = append(*p.msgs, t)
return nil
}

View file

@ -173,6 +173,7 @@ var (
// DefaultRemoteWriteConfig is the default remote write configuration. // DefaultRemoteWriteConfig is the default remote write configuration.
DefaultRemoteWriteConfig = RemoteWriteConfig{ DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second), RemoteTimeout: model.Duration(30 * time.Second),
ProtobufMessage: RemoteWriteProtoMsgV1,
QueueConfig: DefaultQueueConfig, QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig, MetadataConfig: DefaultMetadataConfig,
HTTPClientConfig: config.DefaultHTTPClientConfig, HTTPClientConfig: config.DefaultHTTPClientConfig,
@ -271,7 +272,7 @@ func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) {
jobNames := map[string]string{} jobNames := map[string]string{}
for i, scfg := range c.ScrapeConfigs { for i, scfg := range c.ScrapeConfigs {
// We do these checks for library users that would not call Validate in // We do these checks for library users that would not call validate in
// Unmarshal. // Unmarshal.
if err := scfg.Validate(c.GlobalConfig); err != nil { if err := scfg.Validate(c.GlobalConfig); err != nil {
return nil, err return nil, err
@ -1025,9 +1026,52 @@ func CheckTargetAddress(address model.LabelValue) error {
return nil return nil
} }
// This needs to live here rather than in the remote package to avoid an import cycle. // TODO(bwplotka): Remove in the next PRs (review split PR for readability).
type RemoteWriteFormat int64 type RemoteWriteFormat int64
// RemoteWriteProtoMsg represents the known protobuf message for the remote write
// 1.0 and 2.0 specs.
type RemoteWriteProtoMsg string
// Validate returns error if the given reference for the protobuf message is not supported.
func (s RemoteWriteProtoMsg) Validate() error {
switch s {
case RemoteWriteProtoMsgV1, RemoteWriteProtoMsgV2:
return nil
default:
return fmt.Errorf("unknown remote write protobuf message %v, supported: %v", s, RemoteWriteProtoMsgs{RemoteWriteProtoMsgV1, RemoteWriteProtoMsgV2}.String())
}
}
type RemoteWriteProtoMsgs []RemoteWriteProtoMsg
func (m RemoteWriteProtoMsgs) Strings() []string {
ret := make([]string, 0, len(m))
for _, typ := range m {
ret = append(ret, string(typ))
}
return ret
}
func (m RemoteWriteProtoMsgs) String() string {
return strings.Join(m.Strings(), ",")
}
var (
// RemoteWriteProtoMsgV1 represents the deprecated `prometheus.WriteRequest` protobuf
// message introduced in the https://prometheus.io/docs/specs/remote_write_spec/.
//
// NOTE: This string is used for both HTTP header values and config value, so don't change
// this reference.
RemoteWriteProtoMsgV1 RemoteWriteProtoMsg = "prometheus.WriteRequest"
// RemoteWriteProtoMsgV2 represents the `io.prometheus.write.v2.Request` protobuf
// message introduced in https://prometheus.io/docs/specs/remote_write_spec_2_0/
//
// NOTE: This string is used for both HTTP header values and config value, so don't change
// this reference.
RemoteWriteProtoMsgV2 RemoteWriteProtoMsg = "io.prometheus.write.v2.Request"
)
// RemoteWriteConfig is the configuration for writing to remote storage. // RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct { type RemoteWriteConfig struct {
URL *config.URL `yaml:"url"` URL *config.URL `yaml:"url"`
@ -1037,7 +1081,9 @@ type RemoteWriteConfig struct {
Name string `yaml:"name,omitempty"` Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
ProtocolVersion RemoteWriteFormat `yaml:"remote_write_version,omitempty"` // ProtobufMessage specifies the protobuf message to use against the remote
// receiver as specified in https://prometheus.io/docs/specs/remote_write_spec_2_0/
ProtobufMessage RemoteWriteProtoMsg `yaml:"protobuf_message,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse // We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types. // values arbitrarily into the overflow maps of further-down types.
@ -1072,6 +1118,10 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return err return err
} }
if err := c.ProtobufMessage.Validate(); err != nil {
return fmt.Errorf("invalid protobuf_message value: %w", err)
}
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer. // The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs. // We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here. // Thus we just do its validation here.

View file

@ -104,6 +104,7 @@ var expectedConf = &Config{
RemoteWriteConfigs: []*RemoteWriteConfig{ RemoteWriteConfigs: []*RemoteWriteConfig{
{ {
URL: mustParseURL("http://remote1/push"), URL: mustParseURL("http://remote1/push"),
ProtobufMessage: RemoteWriteProtoMsgV1,
RemoteTimeout: model.Duration(30 * time.Second), RemoteTimeout: model.Duration(30 * time.Second),
Name: "drop_expensive", Name: "drop_expensive",
WriteRelabelConfigs: []*relabel.Config{ WriteRelabelConfigs: []*relabel.Config{
@ -133,6 +134,7 @@ var expectedConf = &Config{
}, },
{ {
URL: mustParseURL("http://remote2/push"), URL: mustParseURL("http://remote2/push"),
ProtobufMessage: RemoteWriteProtoMsgV2,
RemoteTimeout: model.Duration(30 * time.Second), RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: DefaultQueueConfig, QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig, MetadataConfig: DefaultMetadataConfig,
@ -1794,6 +1796,10 @@ var expectedErrors = []struct {
filename: "remote_write_authorization_header.bad.yml", filename: "remote_write_authorization_header.bad.yml",
errMsg: `authorization header must be changed via the basic_auth, authorization, oauth2, sigv4, or azuread parameter`, errMsg: `authorization header must be changed via the basic_auth, authorization, oauth2, sigv4, or azuread parameter`,
}, },
{
filename: "remote_write_wrong_msg.bad.yml",
errMsg: `invalid protobuf_message value: unknown remote write protobuf message io.prometheus.writet.v2.Request, supported: prometheus.WriteRequest,io.prometheus.write.v2.Request`,
},
{ {
filename: "remote_write_url_missing.bad.yml", filename: "remote_write_url_missing.bad.yml",
errMsg: `url for remote_write is empty`, errMsg: `url for remote_write is empty`,

View file

@ -34,6 +34,7 @@ remote_write:
key_file: valid_key_file key_file: valid_key_file
- url: http://remote2/push - url: http://remote2/push
protobuf_message: io.prometheus.write.v2.Request
name: rw_tls name: rw_tls
tls_config: tls_config:
cert_file: valid_cert_file cert_file: valid_cert_file

View file

@ -0,0 +1,3 @@
remote_write:
- url: localhost:9090
protobuf_message: io.prometheus.writet.v2.Request # typo in 'write"

View file

@ -26,6 +26,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--web.enable-lifecycle</code> | Enable shutdown and reload via HTTP request. | `false` | | <code class="text-nowrap">--web.enable-lifecycle</code> | Enable shutdown and reload via HTTP request. | `false` |
| <code class="text-nowrap">--web.enable-admin-api</code> | Enable API endpoints for admin control actions. | `false` | | <code class="text-nowrap">--web.enable-admin-api</code> | Enable API endpoints for admin control actions. | `false` |
| <code class="text-nowrap">--web.enable-remote-write-receiver</code> | Enable API endpoint accepting remote write requests. | `false` | | <code class="text-nowrap">--web.enable-remote-write-receiver</code> | Enable API endpoint accepting remote write requests. | `false` |
| <code class="text-nowrap">--web.remote-write-receiver.accepted-protobuf-messages</code> | List of the remote write protobuf messages to accept when receiving the remote writes. Supported values: prometheus.WriteRequest,io.prometheus.write.v2.Request | `prometheus.WriteRequest` |
| <code class="text-nowrap">--web.console.templates</code> | Path to the console template directory, available at /consoles. | `consoles` | | <code class="text-nowrap">--web.console.templates</code> | Path to the console template directory, available at /consoles. | `consoles` |
| <code class="text-nowrap">--web.console.libraries</code> | Path to the console library directory. | `console_libraries` | | <code class="text-nowrap">--web.console.libraries</code> | Path to the console library directory. | `console_libraries` |
| <code class="text-nowrap">--web.page-title</code> | Document title of Prometheus instance. | `Prometheus Time Series Collection and Processing Server` | | <code class="text-nowrap">--web.page-title</code> | Document title of Prometheus instance. | `Prometheus Time Series Collection and Processing Server` |
@ -55,7 +56,6 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | | <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--remote-write-format</code> | remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format) | `0` |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` | | <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -3547,6 +3547,17 @@ this functionality.
# The URL of the endpoint to send samples to. # The URL of the endpoint to send samples to.
url: <string> url: <string>
# protobuf message to use when writing to the remote write endpoint.
#
# * The `prometheus.WriteRequest` represents the message introduced in Remote Write 1.0, which
# will be deprecated eventually.
# * The `io.prometheus.write.v2.Request` was introduced in Remote Write 2.0 and replaces the former,
# by improving efficiency and sending metadata, created timestamp and native histograms by default.
#
# Before changing this value, consult with your remote storage provider (or test) what message it supports.
# Read more on https://prometheus.io/docs/specs/remote_write_spec_2_0/#io-prometheus-write-v2-request
[ protobuf_message: <prometheus.WriteRequest | io.prometheus.write.v2.Request > | default = prometheus.WriteRequest ]
# Timeout for requests to the remote write endpoint. # Timeout for requests to the remote write endpoint.
[ remote_timeout: <duration> | default = 30s ] [ remote_timeout: <duration> | default = 30s ]
@ -3568,6 +3579,7 @@ write_relabel_configs:
[ send_exemplars: <boolean> | default = false ] [ send_exemplars: <boolean> | default = false ]
# Enables sending of native histograms, also known as sparse histograms, over remote write. # Enables sending of native histograms, also known as sparse histograms, over remote write.
# For the `io.prometheus.write.v2.Request` message, this option is noop (always true).
[ send_native_histograms: <boolean> | default = false ] [ send_native_histograms: <boolean> | default = false ]
# Sets the `Authorization` header on every remote write request with the # Sets the `Authorization` header on every remote write request with the
@ -3581,7 +3593,7 @@ basic_auth:
# Optional `Authorization` header configuration. # Optional `Authorization` header configuration.
authorization: authorization:
# Sets the authentication type. # Sets the authentication type.
[ type: <string> | default: Bearer ] [ type: <string> | default = Bearer ]
# Sets the credentials. It is mutually exclusive with # Sets the credentials. It is mutually exclusive with
# `credentials_file`. # `credentials_file`.
[ credentials: <secret> ] [ credentials: <secret> ]
@ -3645,7 +3657,7 @@ tls_config:
# contain port numbers. # contain port numbers.
[ no_proxy: <string> ] [ no_proxy: <string> ]
# Use proxy URL indicated by environment variables (HTTP_PROXY, https_proxy, HTTPs_PROXY, https_proxy, and no_proxy) # Use proxy URL indicated by environment variables (HTTP_PROXY, https_proxy, HTTPs_PROXY, https_proxy, and no_proxy)
[ proxy_from_environment: <boolean> | default: false ] [ proxy_from_environment: <boolean> | default = false ]
# Specifies headers to send to proxies during CONNECT requests. # Specifies headers to send to proxies during CONNECT requests.
[ proxy_connect_header: [ proxy_connect_header:
[ <string>: [<secret>, ...] ] ] [ <string>: [<secret>, ...] ] ]
@ -3654,7 +3666,7 @@ tls_config:
[ follow_redirects: <boolean> | default = true ] [ follow_redirects: <boolean> | default = true ]
# Whether to enable HTTP2. # Whether to enable HTTP2.
[ enable_http2: <boolean> | default: true ] [ enable_http2: <boolean> | default = true ]
# Configures the queue used to write to remote storage. # Configures the queue used to write to remote storage.
queue_config: queue_config:
@ -3683,7 +3695,10 @@ queue_config:
# which means that all samples are sent. # which means that all samples are sent.
[ sample_age_limit: <duration> | default = 0s ] [ sample_age_limit: <duration> | default = 0s ]
# Configures the sending of series metadata to remote storage. # Configures the sending of series metadata to remote storage
# if the `prometheus.WriteRequest` message was chosen. When
# `io.prometheus.write.v2.Request` is used, metadata is always sent.
#
# Metadata configuration is subject to change at any point # Metadata configuration is subject to change at any point
# or be removed in future releases. # or be removed in future releases.
metadata_config: metadata_config:

View file

@ -75,26 +75,26 @@ func TestContentNegotiation(t *testing.T) {
testcases := []struct { testcases := []struct {
name string name string
success bool success bool
qmRwFormat config.RemoteWriteFormat qmRwFormat config.RemoteWriteProtoMsg
rwFormat config.RemoteWriteFormat rwFormat config.RemoteWriteFormat
steps []contentNegotiationStep steps []contentNegotiationStep
}{ }{
// Test a simple case where the v2 request we send is processed first time. // Test a simple case where the v2 request we send is processed first time.
{ {
success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 happy path", qmRwFormat: config.RemoteWriteProtoMsgV2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"},
}, },
}, },
// Test a simple case where the v1 request we send is processed first time. // Test a simple case where the v1 request we send is processed first time.
{ {
success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ success: true, name: "v1 happy path", qmRwFormat: config.RemoteWriteProtoMsgV1, rwFormat: Version1, steps: []contentNegotiationStep{
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Test a case where the v1 request has a temporary delay but goes through on retry. // Test a case where the v1 request has a temporary delay but goes through on retry.
// There is no content re-negotiation between first and retry attempts. // There is no content re-negotiation between first and retry attempts.
{ {
success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{ success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: config.RemoteWriteProtoMsgV1, rwFormat: Version1, steps: []contentNegotiationStep{
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"},
}, },
@ -102,28 +102,28 @@ func TestContentNegotiation(t *testing.T) {
// Repeat the above test but with v2. The request has a temporary delay but goes through on retry. // Repeat the above test but with v2. The request has a temporary delay but goes through on retry.
// There is no content re-negotiation between first and retry attempts. // There is no content re-negotiation between first and retry attempts.
{ {
success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: config.RemoteWriteProtoMsgV2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"},
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"},
}, },
}, },
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade. // Now test where the server suddenly stops speaking 2.0 and we need to downgrade.
{ {
success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: config.RemoteWriteProtoMsgV2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400. // Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400.
{ {
success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: config.RemoteWriteProtoMsgV2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 400}, attemptString: "0,1,snappy,HTTP 400: msg: "}, {lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 400}, attemptString: "0,1,snappy,HTTP 400: msg: "},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
}, },
}, },
// Now test where the server flip flops between "2.0;snappy" and "0.1.0" only. // Now test where the server flip flops between "2.0;snappy" and "0.1.0" only.
{ {
success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{ success: false, name: "flip flopping", qmRwFormat: config.RemoteWriteProtoMsgV2, rwFormat: Version2, steps: []contentNegotiationStep{
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,0,snappy,HTTP 406: msg: "}, {lastRWHeader: "0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,0,snappy,HTTP 406: msg: "},
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "}, {lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
@ -167,7 +167,7 @@ func TestContentNegotiation(t *testing.T) {
queueConfig.Capacity = len(samples) queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples) queueConfig.MaxSamplesPerSend = len(samples)
// For now we only ever have a single rw config in this test. // For now we only ever have a single rw config in this test.
conf.RemoteWriteConfigs[0].ProtocolVersion = tc.qmRwFormat conf.RemoteWriteConfigs[0].ProtobufMessage = tc.qmRwFormat
require.NoError(t, s.ApplyConfig(conf)) require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig) hash, err := toHash(writeConfig)
require.NoError(t, err) require.NoError(t, err)
@ -369,7 +369,7 @@ func TestWALMetadataDelivery(t *testing.T) {
writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = cfg writeConfig.QueueConfig = cfg
writeConfig.ProtocolVersion = Version2 writeConfig.ProtobufMessage = config.RemoteWriteProtoMsgV2
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,

View file

@ -148,7 +148,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newQueues := make(map[string]*QueueManager) newQueues := make(map[string]*QueueManager)
newHashes := []string{} newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs { for _, rwConf := range conf.RemoteWriteConfigs {
if rwConf.ProtocolVersion > Version1 && !rws.metadataInWAL { if rwConf.ProtobufMessage == config.RemoteWriteProtoMsgV2 && !rws.metadataInWAL {
return errors.New("invalid remote write configuration, if you are using remote write version 2.0 then the feature flag for metadata records in the WAL must be enabled") return errors.New("invalid remote write configuration, if you are using remote write version 2.0 then the feature flag for metadata records in the WAL must be enabled")
} }
hash, err := toHash(rwConf) hash, err := toHash(rwConf)
@ -169,9 +169,19 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
name = rwConf.Name name = rwConf.Name
} }
// TODO(bwplotka): Remove in the next PR (split for readability).
protoVersion := func() config.RemoteWriteFormat {
switch rwConf.ProtobufMessage {
case config.RemoteWriteProtoMsgV2:
return Version2
default:
return Version1
}
}()
c, err := NewWriteClient(name, &ClientConfig{ c, err := NewWriteClient(name, &ClientConfig{
URL: rwConf.URL, URL: rwConf.URL,
RemoteWriteFormat: rwConf.ProtocolVersion, RemoteWriteFormat: protoVersion,
Timeout: rwConf.RemoteTimeout, Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig, HTTPClientConfig: rwConf.HTTPClientConfig,
SigV4Config: rwConf.SigV4Config, SigV4Config: rwConf.SigV4Config,
@ -195,7 +205,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
// Work out what protocol and compression to use for this endpoint. // Work out what protocol and compression to use for this endpoint.
// Default to Remote Write Version1. // Default to Remote Write Version1.
rwFormat := Version1 rwFormat := Version1
switch rwConf.ProtocolVersion { switch protoVersion {
case Version1: case Version1:
// We use the standard value as there's no negotiation to be had. // We use the standard value as there's no negotiation to be had.
case Version2: case Version2:

View file

@ -247,7 +247,7 @@ func NewAPI(
registerer prometheus.Registerer, registerer prometheus.Registerer,
statsRenderer StatsRenderer, statsRenderer StatsRenderer,
rwEnabled bool, rwEnabled bool,
rwFormat config.RemoteWriteFormat, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
otlpEnabled bool, otlpEnabled bool,
) *API { ) *API {
a := &API{ a := &API{
@ -290,20 +290,10 @@ func NewAPI(
} }
if rwEnabled { if rwEnabled {
// TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising. // TODO(bwplotka): Use acceptRemoteWriteProtoMsgs in the next PR (split PR for review readability).
// For rollout we do two phases: // and remove all head/negotiation.
// 0. (Before) no flags set a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, 1)
// 1. (During) support new protocols but don't advertise a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, 1)
// <wait until all servers have rolled out and now support RW2.0>
// 2. (After) support new protocols and advertise
//
// For rollback the two phases are:
// 0. (Before) support new protocols and advertise
// 1. (During) support new protocols but don't advertise
// <wait a suitable period for all sending clients to be aware that receiving servers no longer support 2.0>
// 2. (After) no flags set
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat)
a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, rwFormat)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)
@ -1661,17 +1651,17 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
} }
} }
func (api *API) remoteWriteHead(w http.ResponseWriter, r *http.Request) { func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
if api.remoteWriteHeadHandler != nil { if api.remoteWriteHandler != nil {
api.remoteWriteHeadHandler.ServeHTTP(w, r) api.remoteWriteHandler.ServeHTTP(w, r)
} else { } else {
http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound) http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound)
} }
} }
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { func (api *API) remoteWriteHead(w http.ResponseWriter, r *http.Request) {
if api.remoteWriteHandler != nil { if api.remoteWriteHeadHandler != nil {
api.remoteWriteHandler.ServeHTTP(w, r) api.remoteWriteHeadHandler.ServeHTTP(w, r)
} else { } else {
http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound) http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound)
} }

View file

@ -35,7 +35,6 @@ import (
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/annotations"
) )
@ -136,7 +135,7 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
nil, nil,
nil, nil,
false, false,
remote.Version1, config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2},
false, // Disable experimental reduce remote write proto support. false, // Disable experimental reduce remote write proto support.
) )

View file

@ -264,8 +264,8 @@ type Options struct {
EnableOTLPWriteReceiver bool EnableOTLPWriteReceiver bool
IsAgent bool IsAgent bool
AppName string AppName string
// TODO(cstyan): should this change to a list of tuples, maybe via the content negotiation PR?
RemoteWriteFormat config.RemoteWriteFormat AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
Registerer prometheus.Registerer Registerer prometheus.Registerer
@ -355,7 +355,7 @@ func New(logger log.Logger, o *Options) *Handler {
o.Registerer, o.Registerer,
nil, nil,
o.EnableRemoteWriteReceiver, o.EnableRemoteWriteReceiver,
o.RemoteWriteFormat, o.AcceptRemoteWriteProtoMsgs,
o.EnableOTLPWriteReceiver, o.EnableOTLPWriteReceiver,
) )