Changed the configuration allowing content-type and encoding preferences for both sending and receiving.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2024-04-22 13:19:19 +01:00
parent 7b88101cf5
commit 7322f421fd
6 changed files with 357 additions and 183 deletions

View file

@ -158,8 +158,6 @@ type flagConfig struct {
enableNewSDManager bool enableNewSDManager bool
enablePerStepStats bool enablePerStepStats bool
enableAutoGOMAXPROCS bool enableAutoGOMAXPROCS bool
// todo: how to use the enable feature flag properly + use the remote format enum type
rwFormat int
enableAutoGOMEMLIMIT bool enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool enableConcurrentRuleEval bool
@ -314,6 +312,12 @@ func main() {
a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests."). a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests.").
Default("false").BoolVar(&cfg.web.EnableRemoteWriteReceiver) Default("false").BoolVar(&cfg.web.EnableRemoteWriteReceiver)
a.Flag("web.remote-write-receiver.protobuf-types", fmt.Sprintf("List of accepted remote write 2.0 content types to advertise to senders, ordered by the preference. Note that the final decision is on the sender. Supported list values: %v", config.DefaultRemoteWriteProtoTypes.String())).
Default(config.DefaultRemoteWriteProtoTypes.Strings()...).SetValue(rwProtoTypeFlagValue(&cfg.web.RemoteWriteReceiverProtoTypes))
a.Flag("web.remote-write-receiver.compressions", fmt.Sprintf("List of accepted remote write 2.0 content encodings (compressions) to advertise to senders, ordered by the preference. Note that the final decision is on the sender. Supported list values: %v", config.DefaultRemoteWriteCompressions.String())).
Default(config.DefaultRemoteWriteCompressions.Strings()...).SetValue(rwCompressionFlagValue(&cfg.web.RemoteWriteReceiverCompressions))
a.Flag("web.console.templates", "Path to the console template directory, available at /consoles."). a.Flag("web.console.templates", "Path to the console template directory, available at /consoles.").
Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath) Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath)
@ -455,9 +459,6 @@ func main() {
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList) Default("").StringsVar(&cfg.featureList)
a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)").
Default("0").IntVar(&cfg.rwFormat)
promlogflag.AddFlags(a, &cfg.promlogConfig) promlogflag.AddFlags(a, &cfg.promlogConfig)
a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error { a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {
@ -820,7 +821,6 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String() cfg.web.Flags[f.Name] = f.Value.String()
} }
cfg.web.RemoteWriteFormat = config.RemoteWriteFormat(cfg.rwFormat)
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager. // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
@ -1737,3 +1737,63 @@ type discoveryManager interface {
Run() error Run() error
SyncCh() <-chan map[string][]*targetgroup.Group SyncCh() <-chan map[string][]*targetgroup.Group
} }
// TODO(bwplotka): Add unit test.
type rwProtoTypeFlagParser struct {
types *[]config.RemoteWriteProtoType
}
func rwProtoTypeFlagValue(types *[]config.RemoteWriteProtoType) kingpin.Value {
return &rwProtoTypeFlagParser{types: types}
}
func (p *rwProtoTypeFlagParser) IsCumulative() bool {
return true
}
func (p *rwProtoTypeFlagParser) String() string {
ss := make([]string, 0, len(*p.types))
for _, t := range *p.types {
ss = append(ss, string(t))
}
return strings.Join(ss, ",")
}
func (p *rwProtoTypeFlagParser) Set(opt string) error {
t := config.RemoteWriteProtoType(opt)
if err := t.Validate(); err != nil {
return err
}
*p.types = append(*p.types, t)
return nil
}
// TODO(bwplotka): Add unit test.
type rwCompressionFlagParser struct {
types *[]config.RemoteWriteCompression
}
func rwCompressionFlagValue(types *[]config.RemoteWriteCompression) kingpin.Value {
return &rwCompressionFlagParser{types: types}
}
func (p *rwCompressionFlagParser) IsCumulative() bool {
return true
}
func (p *rwCompressionFlagParser) String() string {
ss := make([]string, 0, len(*p.types))
for _, t := range *p.types {
ss = append(ss, string(t))
}
return strings.Join(ss, ",")
}
func (p *rwCompressionFlagParser) Set(opt string) error {
t := config.RemoteWriteCompression(opt)
if err := t.Validate(); err != nil {
return err
}
*p.types = append(*p.types, t)
return nil
}

View file

@ -173,6 +173,9 @@ var (
// DefaultRemoteWriteConfig is the default remote write configuration. // DefaultRemoteWriteConfig is the default remote write configuration.
DefaultRemoteWriteConfig = RemoteWriteConfig{ DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second), RemoteTimeout: model.Duration(30 * time.Second),
ProtobufTypes: DefaultRemoteWriteProtoTypes,
Compressions: DefaultRemoteWriteCompressions,
QueueConfig: DefaultQueueConfig, QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig, MetadataConfig: DefaultMetadataConfig,
HTTPClientConfig: config.DefaultHTTPClientConfig, HTTPClientConfig: config.DefaultHTTPClientConfig,
@ -1025,9 +1028,6 @@ func CheckTargetAddress(address model.LabelValue) error {
return nil return nil
} }
// This needs to live here rather than in the remote package to avoid an import cycle.
type RemoteWriteFormat int64
// RemoteWriteConfig is the configuration for writing to remote storage. // RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct { type RemoteWriteConfig struct {
URL *config.URL `yaml:"url"` URL *config.URL `yaml:"url"`
@ -1037,10 +1037,9 @@ type RemoteWriteConfig struct {
Name string `yaml:"name,omitempty"` Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
ProtocolVersion RemoteWriteFormat `yaml:"remote_write_version,omitempty"` ProtobufTypes []RemoteWriteProtoType `yaml:"proto_types,omitempty"`
Compressions []RemoteWriteCompression `yaml:"compressions,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
QueueConfig QueueConfig `yaml:"queue_config,omitempty"` QueueConfig QueueConfig `yaml:"queue_config,omitempty"`
MetadataConfig MetadataConfig `yaml:"metadata_config,omitempty"` MetadataConfig MetadataConfig `yaml:"metadata_config,omitempty"`
@ -1072,6 +1071,20 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return err return err
} }
if c.ProtobufTypes == nil {
c.ProtobufTypes = DefaultRemoteWriteProtoTypes
}
if err := validateRemoteWriteProtoTypes(c.ProtobufTypes); err != nil {
return fmt.Errorf("invalid protobuf_types value: %w", err)
}
if c.Compressions == nil {
c.Compressions = DefaultRemoteWriteCompressions
}
if err := validateRemoteWriteCompressions(c.Compressions); err != nil {
return fmt.Errorf("invalid compressions value: %w", err)
}
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer. // The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs. // We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here. // Thus we just do its validation here.

View file

@ -0,0 +1,162 @@
package config
import (
"errors"
"fmt"
"sort"
"strings"
)
// TODO(bwplotka): Consider an util for "preference enums" as it's similar code for rw compression, proto type and scrape protocol.
// RemoteWriteProtoType represents the supported protobuf types for the remote write.
type RemoteWriteProtoType string
// Validate returns error if the given protobuf type is not supported.
func (s RemoteWriteProtoType) Validate() error {
if _, ok := RemoteWriteContentTypeHeaders[s]; !ok {
return fmt.Errorf("unknown remote write protobuf type %v, supported: %v",
s, func() (ret []string) {
for k := range RemoteWriteContentTypeHeaders {
ret = append(ret, string(k))
}
sort.Strings(ret)
return ret
}())
}
return nil
}
type RemoteWriteProtoTypes []RemoteWriteProtoType
func (t RemoteWriteProtoTypes) Strings() []string {
ret := make([]string, 0, len(t))
for _, typ := range t {
ret = append(ret, string(typ))
}
return ret
}
func (t RemoteWriteProtoTypes) String() string {
return strings.Join(t.Strings(), ",")
}
// ServerAcceptHeaderValue returns server Accept header value for
// given list of proto types as per RFC 9110 https://www.rfc-editor.org/rfc/rfc9110.html#section-12.5.1-14
func (t RemoteWriteProtoTypes) ServerAcceptHeaderValue() string {
// TODO(bwplotka): Consider implementing an optional quality factor.
ret := make([]string, 0, len(t))
for _, typ := range t {
ret = append(ret, RemoteWriteContentTypeHeaders[typ])
}
return strings.Join(ret, ",")
}
var (
RemoteWriteProtoTypeV1 RemoteWriteProtoType = "v1.WriteRequest"
RemoteWriteProtoTypeV2 RemoteWriteProtoType = "v2.WriteRequest"
RemoteWriteContentTypeHeaders = map[RemoteWriteProtoType]string{
RemoteWriteProtoTypeV1: "application/x-protobuf", // Also application/x-protobuf; proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec.
RemoteWriteProtoTypeV2: "application/x-protobuf;proto=io.prometheus.remote.write.v2.WriteRequest",
}
// DefaultRemoteWriteProtoTypes is the set of remote write protobuf types that will be
// preferred by the remote write client.
DefaultRemoteWriteProtoTypes = RemoteWriteProtoTypes{
RemoteWriteProtoTypeV1,
RemoteWriteProtoTypeV2,
}
)
// validateRemoteWriteProtoTypes return errors if we see problems with rw protobuf types in
// the Prometheus configuration.
func validateRemoteWriteProtoTypes(ts []RemoteWriteProtoType) error {
if len(ts) == 0 {
return errors.New("protobuf_types cannot be empty")
}
dups := map[string]struct{}{}
for _, t := range ts {
if _, ok := dups[strings.ToLower(string(t))]; ok {
return fmt.Errorf("duplicated protobuf types in protobuf_types, got %v", ts)
}
if err := t.Validate(); err != nil {
return fmt.Errorf("protobuf_types: %w", err)
}
dups[strings.ToLower(string(t))] = struct{}{}
}
return nil
}
// RemoteWriteCompression represents the supported compressions for the remote write.
type RemoteWriteCompression string
// Validate returns error if the given protobuf type is not supported.
func (s RemoteWriteCompression) Validate() error {
if _, ok := RemoteWriteContentEncodingHeaders[s]; !ok {
return fmt.Errorf("unknown remote write protobuf type %v, supported: %v",
s, func() (ret []string) {
for k := range RemoteWriteContentEncodingHeaders {
ret = append(ret, string(k))
}
sort.Strings(ret)
return ret
}())
}
return nil
}
type RemoteWriteCompressions []RemoteWriteCompression
func (cs RemoteWriteCompressions) Strings() []string {
ret := make([]string, 0, len(cs))
for _, c := range cs {
ret = append(ret, string(c))
}
return ret
}
func (cs RemoteWriteCompressions) String() string {
return strings.Join(cs.Strings(), ",")
}
// ServerAcceptEncodingHeaderValue returns server Accept-Encoding header value for
// given list of compressions as per RFC 9110 https://www.rfc-editor.org/rfc/rfc9110.html#name-accept-encoding
func (cs RemoteWriteCompressions) ServerAcceptEncodingHeaderValue() string {
// TODO(bwplotka): Consider implementing an optional quality factor.
ret := make([]string, 0, len(cs))
for _, typ := range cs {
ret = append(ret, RemoteWriteContentEncodingHeaders[typ])
}
return strings.Join(ret, ",")
}
// validateRemoteWriteCompressions return errors if we see problems with rw compressions in
// the Prometheus configuration.
func validateRemoteWriteCompressions(cs []RemoteWriteCompression) error {
if len(cs) == 0 {
return errors.New("compressions cannot be empty")
}
dups := map[string]struct{}{}
for _, c := range cs {
if _, ok := dups[strings.ToLower(string(c))]; ok {
return fmt.Errorf("duplicated compression in compressions, got %v", cs)
}
if err := c.Validate(); err != nil {
return fmt.Errorf("compressions: %w", err)
}
dups[strings.ToLower(string(c))] = struct{}{}
}
return nil
}
var (
RemoteWriteCompressionSnappy RemoteWriteCompression = "snappy"
RemoteWriteContentEncodingHeaders = map[RemoteWriteCompression]string{
RemoteWriteCompressionSnappy: "snappy",
}
DefaultRemoteWriteCompressions = RemoteWriteCompressions{
RemoteWriteCompressionSnappy,
}
)

View file

@ -19,7 +19,6 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"strings"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@ -38,89 +37,31 @@ import (
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
) )
const (
RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version"
RemoteWriteVersion1HeaderValue = "0.1.0"
RemoteWriteVersion20HeaderValue = "2.0"
)
func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
// Return the correct remote write header name/values based on provided rwFormat.
ret := make(map[string]string, 1)
switch rwFormat {
case Version1:
ret[RemoteWriteVersionHeader] = RemoteWriteVersion1HeaderValue
case Version2:
// We need to add the supported protocol definitions in order:
tuples := make([]string, 0, 2)
// Add "2.0;snappy".
tuples = append(tuples, RemoteWriteVersion20HeaderValue+";snappy")
// Add default "0.1.0".
tuples = append(tuples, RemoteWriteVersion1HeaderValue)
ret[RemoteWriteVersionHeader] = strings.Join(tuples, ",")
}
return ret
}
type writeHeadHandler struct {
logger log.Logger
remoteWriteHeadRequests prometheus.Counter
// Experimental feature, new remote write proto format.
// The handler will accept the new format, but it can still accept the old one.
rwFormat config.RemoteWriteFormat
}
func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat) http.Handler {
h := &writeHeadHandler{
logger: logger,
rwFormat: rwFormat,
remoteWriteHeadRequests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
Name: "remote_write_head_requests",
Help: "The number of remote write HEAD requests.",
}),
}
if reg != nil {
reg.MustRegister(h.remoteWriteHeadRequests)
}
return h
}
// Send a response to the HEAD request based on the format supported.
func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Add appropriate header values for the specific rwFormat.
for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
w.Header().Set(hName, hValue)
}
// Increment counter
h.remoteWriteHeadRequests.Inc()
w.WriteHeader(http.StatusOK)
}
type writeHandler struct { type writeHandler struct {
logger log.Logger logger log.Logger
appendable storage.Appendable appendable storage.Appendable
samplesWithInvalidLabelsTotal prometheus.Counter samplesWithInvalidLabelsTotal prometheus.Counter
// Experimental feature, new remote write proto format. // PRW 2.0 definies backward compatible content negotiation across prototypes
// The handler will accept the new format, but it can still accept the old one. // and compressions.
rwFormat config.RemoteWriteFormat protoTypes []config.RemoteWriteProtoType
acceptHeaderValue string
compressions []config.RemoteWriteCompression
acceptEncodingHeaderValue string
} }
// NewWriteHandler creates a http.Handler that accepts remote write requests and // NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable. // writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat config.RemoteWriteFormat) http.Handler { // Compatible with both 1.x and 2.0 spec.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, protoTypes []config.RemoteWriteProtoType, compressions []config.RemoteWriteCompression) http.Handler {
h := &writeHandler{ h := &writeHandler{
logger: logger, logger: logger,
appendable: appendable, appendable: appendable,
rwFormat: rwFormat, protoTypes: protoTypes,
acceptHeaderValue: config.RemoteWriteProtoTypes(protoTypes).ServerAcceptHeaderValue(),
compressions: compressions,
acceptEncodingHeaderValue: config.RemoteWriteCompressions(compressions).ServerAcceptEncodingHeaderValue(),
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus", Namespace: "prometheus",
Subsystem: "api", Subsystem: "api",
@ -134,35 +75,59 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
return h return h
} }
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHandler) parseEncoding(contentEncoding string) (config.RemoteWriteCompression, error) {
var err error // TODO(bwplotka): TBD
got := config.RemoteWriteCompression(contentEncoding)
// Set the header(s) in the response based on the rwFormat the server supports. if err := got.Validate(); err != nil {
for hName, hValue := range rwHeaderNameValues(h.rwFormat) { return "", err // TODO(bwplotka): Wrap properly
w.Header().Set(hName, hValue)
} }
// Parse the headers to work out how to handle this. // TODO(bwplotka): Build utils for this.
contentEncoding := r.Header.Get("Content-Encoding") for _, c := range h.compressions {
protoVer := r.Header.Get(RemoteWriteVersionHeader) if c == got {
return got, nil
}
}
return "", fmt.Errorf("unsupported compression, got %v, supported %v", got, h.acceptEncodingHeaderValue)
}
switch protoVer { func (h *writeHandler) parseType(contentType string) (config.RemoteWriteProtoType, error) {
case "": // TODO(bwplotka): TBD
// No header provided, assume 0.1.0 as everything that relies on later. got := config.RemoteWriteProtoType(contentType)
protoVer = RemoteWriteVersion1HeaderValue if err := got.Validate(); err != nil {
case RemoteWriteVersion1HeaderValue, RemoteWriteVersion20HeaderValue: return "", err // TODO(bwplotka): Wrap properly
// We know this header, woo. }
default:
// We have a version in the header but it is not one we recognise. // TODO(bwplotka): Build utils for this.
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer) for _, c := range h.protoTypes {
// Return a 406 so that the client can choose a more appropriate protocol to use. if c == got {
http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable) return got, nil
}
}
return "", fmt.Errorf("unsupported proto type, got %v, supported %v", got, h.acceptHeaderValue)
}
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Start with ensuring receiver response headers indicate PRW 2.0 and accepted
// content type and compressions.
w.Header().Set("Accept", h.acceptHeaderValue)
w.Header().Set("Accept-Encoding", h.acceptEncodingHeaderValue)
// Initial content type and encoding negotiation.
// NOTE: PRW 2.0 deprecated X-Prometheus-Remote-Write-Version, ignore it, we
// rely on content-type header only.
enc, err := h.parseEncoding(r.Header.Get("Content-Encoding"))
if err != nil {
w.WriteHeader(http.StatusUnsupportedMediaType)
w.Write([]byte(err.Error())) // TODO(bwplotka) Format? Log?
return return
} }
typ, err := h.parseType(r.Header.Get("Content-Type"))
// Deal with 0.1.0 clients that forget to send Content-Encoding. if err != nil {
if protoVer == RemoteWriteVersion1HeaderValue && contentEncoding == "" { w.WriteHeader(http.StatusUnsupportedMediaType)
contentEncoding = "snappy" w.Write([]byte(err.Error())) // TODO(bwplotka) Format? Log? use http.Error
return
} }
// Read the request body. // Read the request body.
@ -173,11 +138,9 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
// Deal with contentEncoding first.
var decompressed []byte var decompressed []byte
switch enc {
switch contentEncoding { case config.RemoteWriteCompressionSnappy:
case "snappy":
decompressed, err = snappy.Decode(nil, body) decompressed, err = snappy.Decode(nil, body)
if err != nil { if err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
@ -185,16 +148,11 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
default: default:
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding) panic("should not happen")
// Return a 406 so that the client can choose a more appropriate protocol to use.
http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable)
return
} }
// Now we have a decompressed buffer we can unmarshal it. switch typ {
// At this point we are happy with the version but need to check the encoding. case config.RemoteWriteProtoTypeV1:
switch protoVer {
case RemoteWriteVersion1HeaderValue:
var req prompb.WriteRequest var req prompb.WriteRequest
if err := proto.Unmarshal(decompressed, &req); err != nil { if err := proto.Unmarshal(decompressed, &req); err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
@ -202,7 +160,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
err = h.write(r.Context(), &req) err = h.write(r.Context(), &req)
case RemoteWriteVersion20HeaderValue: case config.RemoteWriteProtoTypeV2:
// 2.0 request. // 2.0 request.
var reqMinStr writev2.WriteRequest var reqMinStr writev2.WriteRequest
if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil { if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil {
@ -211,6 +169,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
err = h.writeMinStr(r.Context(), &reqMinStr) err = h.writeMinStr(r.Context(), &reqMinStr)
default:
panic("should not happen")
} }
switch { switch {

View file

@ -210,7 +210,6 @@ type API struct {
isAgent bool isAgent bool
statsRenderer StatsRenderer statsRenderer StatsRenderer
remoteWriteHeadHandler http.Handler
remoteWriteHandler http.Handler remoteWriteHandler http.Handler
remoteReadHandler http.Handler remoteReadHandler http.Handler
otlpWriteHandler http.Handler otlpWriteHandler http.Handler
@ -247,7 +246,8 @@ func NewAPI(
registerer prometheus.Registerer, registerer prometheus.Registerer,
statsRenderer StatsRenderer, statsRenderer StatsRenderer,
rwEnabled bool, rwEnabled bool,
rwFormat config.RemoteWriteFormat, rwProtoTypes []config.RemoteWriteProtoType,
rwCompressions []config.RemoteWriteCompression,
otlpEnabled bool, otlpEnabled bool,
) *API { ) *API {
a := &API{ a := &API{
@ -290,20 +290,7 @@ func NewAPI(
} }
if rwEnabled { if rwEnabled {
// TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising. a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwProtoTypes, rwCompressions)
// For rollout we do two phases:
// 0. (Before) no flags set
// 1. (During) support new protocols but don't advertise
// <wait until all servers have rolled out and now support RW2.0>
// 2. (After) support new protocols and advertise
//
// For rollback the two phases are:
// 0. (Before) support new protocols and advertise
// 1. (During) support new protocols but don't advertise
// <wait a suitable period for all sending clients to be aware that receiving servers no longer support 2.0>
// 2. (After) no flags set
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat)
a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, rwFormat)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)
@ -400,7 +387,6 @@ func (api *API) Register(r *route.Router) {
r.Get("/status/walreplay", api.serveWALReplayStatus) r.Get("/status/walreplay", api.serveWALReplayStatus)
r.Post("/read", api.ready(api.remoteRead)) r.Post("/read", api.ready(api.remoteRead))
r.Post("/write", api.ready(api.remoteWrite)) r.Post("/write", api.ready(api.remoteWrite))
r.Head("/write", api.remoteWriteHead)
r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite)) r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite))
r.Get("/alerts", wrapAgent(api.alerts)) r.Get("/alerts", wrapAgent(api.alerts))
@ -1661,14 +1647,6 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
} }
} }
func (api *API) remoteWriteHead(w http.ResponseWriter, r *http.Request) {
if api.remoteWriteHeadHandler != nil {
api.remoteWriteHeadHandler.ServeHTTP(w, r)
} else {
http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound)
}
}
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
if api.remoteWriteHandler != nil { if api.remoteWriteHandler != nil {
api.remoteWriteHandler.ServeHTTP(w, r) api.remoteWriteHandler.ServeHTTP(w, r)

View file

@ -261,11 +261,11 @@ type Options struct {
RemoteReadConcurrencyLimit int RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int RemoteReadBytesInFrame int
EnableRemoteWriteReceiver bool EnableRemoteWriteReceiver bool
RemoteWriteReceiverProtoTypes []config.RemoteWriteProtoType
RemoteWriteReceiverCompressions []config.RemoteWriteCompression
EnableOTLPWriteReceiver bool EnableOTLPWriteReceiver bool
IsAgent bool IsAgent bool
AppName string AppName string
// TODO(cstyan): should this change to a list of tuples, maybe via the content negotiation PR?
RemoteWriteFormat config.RemoteWriteFormat
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
Registerer prometheus.Registerer Registerer prometheus.Registerer
@ -355,7 +355,8 @@ func New(logger log.Logger, o *Options) *Handler {
o.Registerer, o.Registerer,
nil, nil,
o.EnableRemoteWriteReceiver, o.EnableRemoteWriteReceiver,
o.RemoteWriteFormat, o.RemoteWriteReceiverProtoTypes,
o.RemoteWriteReceiverCompressions,
o.EnableOTLPWriteReceiver, o.EnableOTLPWriteReceiver,
) )