From 3f33175e0b744098eaa0c22b5ee5aab8e3d76e0a Mon Sep 17 00:00:00 2001 From: alexgreenbank Date: Fri, 20 Oct 2023 17:35:03 +0000 Subject: [PATCH] Remove config, update proto MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nicolás Pazos --- config/config.go | 4 ---- prompb/remote.proto | 3 ++- prompb/types.proto | 19 +++++++++++++++++++ storage/remote/client.go | 19 ++++++------------- storage/remote/queue_manager.go | 10 ++++------ storage/remote/write.go | 2 +- 6 files changed, 32 insertions(+), 25 deletions(-) diff --git a/config/config.go b/config/config.go index 2719e4ed0f..ddcca84dc7 100644 --- a/config/config.go +++ b/config/config.go @@ -1016,7 +1016,6 @@ func CheckTargetAddress(address model.LabelValue) error { // RemoteWriteConfig is the configuration for writing to remote storage. type RemoteWriteConfig struct { URL *config.URL `yaml:"url"` - Version string `yaml:"version,omitempty"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` Headers map[string]string `yaml:"headers,omitempty"` WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` @@ -1048,9 +1047,6 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err if c.URL == nil { return errors.New("url for remote_write is empty") } - if c.Version != "" && c.Version != "1.0" && c.Version != "1.1" { - return errors.New("version for remote_write must be either '1.0', '1.1' or blank/unspecified") - } for _, rlcfg := range c.WriteRelabelConfigs { if rlcfg == nil { return errors.New("empty or null relabeling rule in remote write config") diff --git a/prompb/remote.proto b/prompb/remote.proto index 358525fee7..b265da4053 100644 --- a/prompb/remote.proto +++ b/prompb/remote.proto @@ -33,7 +33,8 @@ message WriteRequestWithRefs { // Cortex uses this field to determine the source of the write request. // We reserve it to avoid any compatibility issues. reserved 2; - repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false]; + // Metadata (3) has moved to be part of the TimeSeries type + reserved 3; // Symbol table for label names/values. map string_symbol_table = 4 [(gogoproto.nullable) = false]; } diff --git a/prompb/types.proto b/prompb/types.proto index 8039559b26..07cf0fe3bd 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -38,6 +38,24 @@ message MetricMetadata { string unit = 5; } +message MetricMetadataRef { + enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGEHISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; + } + + // Represents the metric type, these match the set from Prometheus. + // Refer to model/textparse/interface.go for details. + MetricType type = 1; + int64 help_ref = 4; + int64 unit_ref = 5; +} message Sample { double value = 1; // timestamp is in ms format, see model/timestamp/timestamp.go for @@ -146,6 +164,7 @@ message ReducedTimeSeries { repeated Sample samples = 2 [(gogoproto.nullable) = false]; repeated ExemplarRef exemplars = 3 [(gogoproto.nullable) = false]; repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; + MetricMetadataRef metadata = 5 [(gogoproto.nullable) = false]; } message LabelRef { diff --git a/storage/remote/client.go b/storage/remote/client.go index 3860583381..a6a62a182f 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -83,7 +83,7 @@ func init() { type Client struct { remoteName string // Used to differentiate clients in metrics. urlString string // url.String() - version string // For write clients; "", "1.0" or "1.1", ignored for read clients + remotewrite11 bool Client *http.Client timeout time.Duration @@ -97,7 +97,7 @@ type Client struct { // ClientConfig configures a client. type ClientConfig struct { URL *config_util.URL - Version string + RemoteWrite11 bool Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig SigV4Config *sigv4.SigV4Config @@ -128,7 +128,7 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) { return &Client{ remoteName: name, urlString: conf.URL.String(), - version: conf.Version, + remotewrite11: conf.RemoteWrite11, Client: httpClient, timeout: time.Duration(conf.Timeout), readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), @@ -210,7 +210,9 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", UserAgent) - if c.version == "1.1" { + + // Set the right header if we're using v1.1 remote write protocol + if c.remotewrite11 { httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.1") // TODO-RW11: Final value? } else { httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") @@ -280,11 +282,6 @@ func (c Client) Endpoint() string { return c.urlString } -// Version of the remote write client -func (c Client) Version() string { - return c.version -} - // Read reads from a remote endpoint. func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { c.readQueries.Inc() @@ -380,7 +377,3 @@ func (c *TestClient) Name() string { func (c *TestClient) Endpoint() string { return c.url } - -func (c *TestClient) Version() string { - return "" -} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 133264429d..04ecb9181e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -386,8 +386,6 @@ type WriteClient interface { Name() string // Endpoint is the remote read or write endpoint for the storage client. Endpoint() string - // Version is the remote write version for the storage client. - Version() string } // QueueManager manages a queue of samples to be sent to the Storage @@ -1414,8 +1412,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { if !ok { return } - if s.qm.internFormat && s.qm.client().Version() == "1.1" { - // the new internFormat bool must be set as well as the client version being 1.1 + if s.qm.internFormat { + // the new internFormat feature flag is be set nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) @@ -1433,8 +1431,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - if s.qm.internFormat && s.qm.client().Version() == "1.1" { - // the new internFormat bool must be set as well as the client version being 1.1 + if s.qm.internFormat { + // the new internFormat feature flag is set nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, diff --git a/storage/remote/write.go b/storage/remote/write.go index 7e4f614662..006e6fac6c 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -167,7 +167,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, err := NewWriteClient(name, &ClientConfig{ URL: rwConf.URL, - Version: rwConf.Version, + RemoteWrite11: rws.writeReducedProto, Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, SigV4Config: rwConf.SigV4Config,