Custom headers on remote-read and refactor implementation to roundtripper.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
This commit is contained in:
Harkishen-Singh 2021-02-18 17:42:21 +05:30
parent 537c0aff49
commit 79ba53a6c4
6 changed files with 66 additions and 20 deletions

View file

@ -33,21 +33,24 @@ import (
)
var (
patRulePath = regexp.MustCompile(`^[^*]*(\*[^/]*)?$`)
unchangeableHeaders = map[string]struct{}{
patRulePath = regexp.MustCompile(`^[^*]*(\*[^/]*)?$`)
reservedHeaders = map[string]struct{}{
// NOTE: authorization is checked specially,
// see RemoteWriteConfig.UnmarshalYAML.
// "authorization": {},
"host": {},
"content-encoding": {},
"content-length": {},
"content-type": {},
"x-prometheus-remote-write-version": {},
"user-agent": {},
"connection": {},
"keep-alive": {},
"proxy-authenticate": {},
"proxy-authorization": {},
"www-authenticate": {},
"accept-encoding": {},
"x-prometheus-remote-write-version": {},
"x-prometheus-remote-read-version": {},
}
)
@ -616,13 +619,8 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return errors.New("empty or null relabeling rule in remote write config")
}
}
for header := range c.Headers {
if strings.ToLower(header) == "authorization" {
return errors.New("authorization header must be changed via the basic_auth or authorization parameter")
}
if _, ok := unchangeableHeaders[strings.ToLower(header)]; ok {
return errors.Errorf("%s is an unchangeable header", header)
}
if err := validateHeaders(c.Headers); err != nil {
return err
}
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
@ -631,6 +629,18 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return c.HTTPClientConfig.Validate()
}
func validateHeaders(headers map[string]string) error {
for header := range headers {
if strings.ToLower(header) == "authorization" {
return errors.New("authorization header must be changed via the basic_auth or authorization parameter")
}
if _, ok := reservedHeaders[strings.ToLower(header)]; ok {
return errors.Errorf("%s is a reserved header. It must not be changed", header)
}
}
return nil
}
// QueueConfig is the configuration for the queue used to write to remote
// storage.
type QueueConfig struct {
@ -667,10 +677,11 @@ type MetadataConfig struct {
// RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct {
URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
ReadRecent bool `yaml:"read_recent,omitempty"`
Name string `yaml:"name,omitempty"`
URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
ReadRecent bool `yaml:"read_recent,omitempty"`
Name string `yaml:"name,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
@ -696,6 +707,9 @@ func (c *RemoteReadConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
if c.URL == nil {
return errors.New("url for remote_read is empty")
}
if err := validateHeaders(c.Headers); err != nil {
return err
}
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here.

View file

@ -972,7 +972,10 @@ var expectedErrors = []struct {
errMsg: `url for remote_read is empty`,
}, {
filename: "remote_write_header.bad.yml",
errMsg: `x-prometheus-remote-write-version is an unchangeable header`,
errMsg: `x-prometheus-remote-write-version is a reserved header. It must not be changed`,
}, {
filename: "remote_read_header.bad.yml",
errMsg: `x-prometheus-remote-write-version is a reserved header. It must not be changed`,
}, {
filename: "remote_write_authorization_header.bad.yml",
errMsg: `authorization header must be changed via the basic_auth or authorization parameter`,

View file

@ -0,0 +1,5 @@
remote_read:
- url: localhost:9090
name: queue1
headers:
"x-prometheus-remote-write-version": "somehack"

View file

@ -1850,6 +1850,11 @@ required_matchers:
# Timeout for requests to the remote read endpoint.
[ remote_timeout: <duration> | default = 1m ]
# Custom HTTP headers to be sent along with each remote read request.
# Be aware that headers that are set by Prometheus itself can't be overwritten.
headers:
[ <string>: <string> ... ]
# Whether reads should be made for queries for time ranges that
# the local storage should have complete data for.
[ read_recent: <boolean> | default = false ]

View file

@ -83,7 +83,6 @@ type Client struct {
url *config_util.URL
Client *http.Client
timeout time.Duration
headers map[string]string
retryOnRateLimit bool
@ -115,6 +114,9 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
}
t := httpClient.Transport
if len(conf.Headers) > 0 {
t = newInjectHeadersRoundTripper(conf.Headers, t)
}
httpClient.Transport = &nethttp.Transport{
RoundTripper: t,
}
@ -138,6 +140,9 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
}
t := httpClient.Transport
if len(conf.Headers) > 0 {
t = newInjectHeadersRoundTripper(conf.Headers, t)
}
httpClient.Transport = &nethttp.Transport{
RoundTripper: t,
}
@ -148,10 +153,25 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
Client: httpClient,
retryOnRateLimit: conf.RetryOnRateLimit,
timeout: time.Duration(conf.Timeout),
headers: conf.Headers,
}, nil
}
func newInjectHeadersRoundTripper(h map[string]string, underlyingRT http.RoundTripper) *injectHeadersRoundTripper {
return &injectHeadersRoundTripper{headers: h, RoundTripper: underlyingRT}
}
type injectHeadersRoundTripper struct {
headers map[string]string
http.RoundTripper
}
func (t *injectHeadersRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
for key, value := range t.headers {
req.Header.Set(key, value)
}
return t.RoundTripper.RoundTrip(req)
}
const defaultBackoff = 0
type RecoverableError struct {
@ -168,9 +188,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
// recoverable.
return err
}
for k, v := range c.headers {
httpReq.Header.Set(k, v)
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", UserAgent)

View file

@ -111,6 +111,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
URL: rrConf.URL,
Timeout: rrConf.RemoteTimeout,
HTTPClientConfig: rrConf.HTTPClientConfig,
Headers: rrConf.Headers,
})
if err != nil {
return err