From d1d80b8bede5dfd00a1c42f4ad08ec72343594c2 Mon Sep 17 00:00:00 2001 From: alexgreenbank Date: Thu, 12 Oct 2023 17:34:51 +0000 Subject: [PATCH] Add 1.1 version handling code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nicolás Pazos --- config/config.go | 4 ++++ storage/remote/client.go | 21 ++++++++++++++++++++- storage/remote/queue_manager.go | 8 ++++++-- storage/remote/write.go | 1 + storage/remote/write_handler.go | 1 + 5 files changed, 32 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index ddcca84dc..2719e4ed0 100644 --- a/config/config.go +++ b/config/config.go @@ -1016,6 +1016,7 @@ func CheckTargetAddress(address model.LabelValue) error { // RemoteWriteConfig is the configuration for writing to remote storage. type RemoteWriteConfig struct { URL *config.URL `yaml:"url"` + Version string `yaml:"version,omitempty"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` Headers map[string]string `yaml:"headers,omitempty"` WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` @@ -1047,6 +1048,9 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err if c.URL == nil { return errors.New("url for remote_write is empty") } + if c.Version != "" && c.Version != "1.0" && c.Version != "1.1" { + return errors.New("version for remote_write must be either '1.0', '1.1' or blank/unspecified") + } for _, rlcfg := range c.WriteRelabelConfigs { if rlcfg == nil { return errors.New("empty or null relabeling rule in remote write config") diff --git a/storage/remote/client.go b/storage/remote/client.go index da29aafe5..386058338 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -83,6 +83,7 @@ func init() { type Client struct { remoteName string // Used to differentiate clients in metrics. urlString string // url.String() + version string // For write clients; "", "1.0" or "1.1", ignored for read clients Client *http.Client timeout time.Duration @@ -96,6 +97,7 @@ type Client struct { // ClientConfig configures a client. type ClientConfig struct { URL *config_util.URL + Version string Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig SigV4Config *sigv4.SigV4Config @@ -126,6 +128,7 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) { return &Client{ remoteName: name, urlString: conf.URL.String(), + version: conf.Version, Client: httpClient, timeout: time.Duration(conf.Timeout), readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), @@ -207,7 +210,12 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", UserAgent) - httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + if c.version == "1.1" { + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.1") // TODO-RW11: Final value? + } else { + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + } + if attempt > 0 { httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt)) } @@ -229,6 +237,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpResp.Body.Close() }() + // TODO-RW11: Here is where we need to handle version downgrade on error + if httpResp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) line := "" @@ -270,6 +280,11 @@ func (c Client) Endpoint() string { return c.urlString } +// Version of the remote write client +func (c Client) Version() string { + return c.version +} + // Read reads from a remote endpoint. func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { c.readQueries.Inc() @@ -365,3 +380,7 @@ func (c *TestClient) Name() string { func (c *TestClient) Endpoint() string { return c.url } + +func (c *TestClient) Version() string { + return "" +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 17ec34d5e..133264429 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -386,6 +386,8 @@ type WriteClient interface { Name() string // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string + // Version is the remote write version for the storage client. + Version() string } // QueueManager manages a queue of samples to be sent to the Storage @@ -1412,7 +1414,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - if s.qm.internFormat { + if s.qm.internFormat && s.qm.client().Version() == "1.1" { + // the new internFormat bool must be set as well as the client version being 1.1 nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) @@ -1430,7 +1433,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - if s.qm.internFormat { + if s.qm.internFormat && s.qm.client().Version() == "1.1" { + // the new internFormat bool must be set as well as the client version being 1.1 nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, diff --git a/storage/remote/write.go b/storage/remote/write.go index ffa61e5fd..7e4f61466 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -167,6 +167,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, err := NewWriteClient(name, &ClientConfig{ URL: rwConf.URL, + Version: rwConf.Version, Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, SigV4Config: rwConf.SigV4Config, diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 2aabcfa02..0efc0e951 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -67,6 +67,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var err error var req *prompb.WriteRequest var reqWithRefs *prompb.WriteRequestWithRefs + // TODO-RW11: Need to check headers to decide what version is and what to do if h.internFormat { reqWithRefs, err = DecodeReducedWriteRequest(r.Body) } else {