Add 1.1 version handling code

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
alexgreenbank 2023-10-12 17:34:51 +00:00 committed by Nicolás Pazos
parent 98a0d00eb2
commit d1d80b8bed
5 changed files with 32 additions and 3 deletions

View file

@ -1016,6 +1016,7 @@ func CheckTargetAddress(address model.LabelValue) error {
// RemoteWriteConfig is the configuration for writing to remote storage. // RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct { type RemoteWriteConfig struct {
URL *config.URL `yaml:"url"` URL *config.URL `yaml:"url"`
Version string `yaml:"version,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"` Headers map[string]string `yaml:"headers,omitempty"`
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
@ -1047,6 +1048,9 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
if c.URL == nil { if c.URL == nil {
return errors.New("url for remote_write is empty") return errors.New("url for remote_write is empty")
} }
if c.Version != "" && c.Version != "1.0" && c.Version != "1.1" {
return errors.New("version for remote_write must be either '1.0', '1.1' or blank/unspecified")
}
for _, rlcfg := range c.WriteRelabelConfigs { for _, rlcfg := range c.WriteRelabelConfigs {
if rlcfg == nil { if rlcfg == nil {
return errors.New("empty or null relabeling rule in remote write config") return errors.New("empty or null relabeling rule in remote write config")

View file

@ -83,6 +83,7 @@ func init() {
type Client struct { type Client struct {
remoteName string // Used to differentiate clients in metrics. remoteName string // Used to differentiate clients in metrics.
urlString string // url.String() urlString string // url.String()
version string // For write clients; "", "1.0" or "1.1", ignored for read clients
Client *http.Client Client *http.Client
timeout time.Duration timeout time.Duration
@ -96,6 +97,7 @@ type Client struct {
// ClientConfig configures a client. // ClientConfig configures a client.
type ClientConfig struct { type ClientConfig struct {
URL *config_util.URL URL *config_util.URL
Version string
Timeout model.Duration Timeout model.Duration
HTTPClientConfig config_util.HTTPClientConfig HTTPClientConfig config_util.HTTPClientConfig
SigV4Config *sigv4.SigV4Config SigV4Config *sigv4.SigV4Config
@ -126,6 +128,7 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
return &Client{ return &Client{
remoteName: name, remoteName: name,
urlString: conf.URL.String(), urlString: conf.URL.String(),
version: conf.Version,
Client: httpClient, Client: httpClient,
timeout: time.Duration(conf.Timeout), timeout: time.Duration(conf.Timeout),
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
@ -207,7 +210,12 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", UserAgent) httpReq.Header.Set("User-Agent", UserAgent)
if c.version == "1.1" {
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.1") // TODO-RW11: Final value?
} else {
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
}
if attempt > 0 { if attempt > 0 {
httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt)) httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt))
} }
@ -229,6 +237,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
httpResp.Body.Close() httpResp.Body.Close()
}() }()
// TODO-RW11: Here is where we need to handle version downgrade on error
if httpResp.StatusCode/100 != 2 { if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
line := "" line := ""
@ -270,6 +280,11 @@ func (c Client) Endpoint() string {
return c.urlString return c.urlString
} }
// Version of the remote write client
func (c Client) Version() string {
return c.version
}
// Read reads from a remote endpoint. // Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
c.readQueries.Inc() c.readQueries.Inc()
@ -365,3 +380,7 @@ func (c *TestClient) Name() string {
func (c *TestClient) Endpoint() string { func (c *TestClient) Endpoint() string {
return c.url return c.url
} }
func (c *TestClient) Version() string {
return ""
}

View file

@ -386,6 +386,8 @@ type WriteClient interface {
Name() string Name() string
// Endpoint is the remote read or write endpoint for the storage client. // Endpoint is the remote read or write endpoint for the storage client.
Endpoint() string Endpoint() string
// Version is the remote write version for the storage client.
Version() string
} }
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
@ -1412,7 +1414,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok { if !ok {
return return
} }
if s.qm.internFormat { if s.qm.internFormat && s.qm.client().Version() == "1.1" {
// the new internFormat bool must be set as well as the client version being 1.1
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
@ -1430,7 +1433,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case <-timer.C: case <-timer.C:
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
if s.qm.internFormat { if s.qm.internFormat && s.qm.client().Version() == "1.1" {
// the new internFormat bool must be set as well as the client version being 1.1
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,

View file

@ -167,6 +167,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
c, err := NewWriteClient(name, &ClientConfig{ c, err := NewWriteClient(name, &ClientConfig{
URL: rwConf.URL, URL: rwConf.URL,
Version: rwConf.Version,
Timeout: rwConf.RemoteTimeout, Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig, HTTPClientConfig: rwConf.HTTPClientConfig,
SigV4Config: rwConf.SigV4Config, SigV4Config: rwConf.SigV4Config,

View file

@ -67,6 +67,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error var err error
var req *prompb.WriteRequest var req *prompb.WriteRequest
var reqWithRefs *prompb.WriteRequestWithRefs var reqWithRefs *prompb.WriteRequestWithRefs
// TODO-RW11: Need to check headers to decide what version is and what to do
if h.internFormat { if h.internFormat {
reqWithRefs, err = DecodeReducedWriteRequest(r.Body) reqWithRefs, err = DecodeReducedWriteRequest(r.Body)
} else { } else {