mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Remove config, update proto
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
parent
d1d80b8bed
commit
3f33175e0b
|
@ -1016,7 +1016,6 @@ func CheckTargetAddress(address model.LabelValue) error {
|
||||||
// RemoteWriteConfig is the configuration for writing to remote storage.
|
// RemoteWriteConfig is the configuration for writing to remote storage.
|
||||||
type RemoteWriteConfig struct {
|
type RemoteWriteConfig struct {
|
||||||
URL *config.URL `yaml:"url"`
|
URL *config.URL `yaml:"url"`
|
||||||
Version string `yaml:"version,omitempty"`
|
|
||||||
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
||||||
Headers map[string]string `yaml:"headers,omitempty"`
|
Headers map[string]string `yaml:"headers,omitempty"`
|
||||||
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
|
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
|
||||||
|
@ -1048,9 +1047,6 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
|
||||||
if c.URL == nil {
|
if c.URL == nil {
|
||||||
return errors.New("url for remote_write is empty")
|
return errors.New("url for remote_write is empty")
|
||||||
}
|
}
|
||||||
if c.Version != "" && c.Version != "1.0" && c.Version != "1.1" {
|
|
||||||
return errors.New("version for remote_write must be either '1.0', '1.1' or blank/unspecified")
|
|
||||||
}
|
|
||||||
for _, rlcfg := range c.WriteRelabelConfigs {
|
for _, rlcfg := range c.WriteRelabelConfigs {
|
||||||
if rlcfg == nil {
|
if rlcfg == nil {
|
||||||
return errors.New("empty or null relabeling rule in remote write config")
|
return errors.New("empty or null relabeling rule in remote write config")
|
||||||
|
|
|
@ -33,7 +33,8 @@ message WriteRequestWithRefs {
|
||||||
// Cortex uses this field to determine the source of the write request.
|
// Cortex uses this field to determine the source of the write request.
|
||||||
// We reserve it to avoid any compatibility issues.
|
// We reserve it to avoid any compatibility issues.
|
||||||
reserved 2;
|
reserved 2;
|
||||||
repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
|
// Metadata (3) has moved to be part of the TimeSeries type
|
||||||
|
reserved 3;
|
||||||
// Symbol table for label names/values.
|
// Symbol table for label names/values.
|
||||||
map<uint64, string> string_symbol_table = 4 [(gogoproto.nullable) = false];
|
map<uint64, string> string_symbol_table = 4 [(gogoproto.nullable) = false];
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,24 @@ message MetricMetadata {
|
||||||
string unit = 5;
|
string unit = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message MetricMetadataRef {
|
||||||
|
enum MetricType {
|
||||||
|
UNKNOWN = 0;
|
||||||
|
COUNTER = 1;
|
||||||
|
GAUGE = 2;
|
||||||
|
HISTOGRAM = 3;
|
||||||
|
GAUGEHISTOGRAM = 4;
|
||||||
|
SUMMARY = 5;
|
||||||
|
INFO = 6;
|
||||||
|
STATESET = 7;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Represents the metric type, these match the set from Prometheus.
|
||||||
|
// Refer to model/textparse/interface.go for details.
|
||||||
|
MetricType type = 1;
|
||||||
|
int64 help_ref = 4;
|
||||||
|
int64 unit_ref = 5;
|
||||||
|
}
|
||||||
message Sample {
|
message Sample {
|
||||||
double value = 1;
|
double value = 1;
|
||||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
||||||
|
@ -146,6 +164,7 @@ message ReducedTimeSeries {
|
||||||
repeated Sample samples = 2 [(gogoproto.nullable) = false];
|
repeated Sample samples = 2 [(gogoproto.nullable) = false];
|
||||||
repeated ExemplarRef exemplars = 3 [(gogoproto.nullable) = false];
|
repeated ExemplarRef exemplars = 3 [(gogoproto.nullable) = false];
|
||||||
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
|
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
|
||||||
|
MetricMetadataRef metadata = 5 [(gogoproto.nullable) = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message LabelRef {
|
message LabelRef {
|
||||||
|
|
|
@ -83,7 +83,7 @@ func init() {
|
||||||
type Client struct {
|
type Client struct {
|
||||||
remoteName string // Used to differentiate clients in metrics.
|
remoteName string // Used to differentiate clients in metrics.
|
||||||
urlString string // url.String()
|
urlString string // url.String()
|
||||||
version string // For write clients; "", "1.0" or "1.1", ignored for read clients
|
remotewrite11 bool
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ type Client struct {
|
||||||
// ClientConfig configures a client.
|
// ClientConfig configures a client.
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
URL *config_util.URL
|
URL *config_util.URL
|
||||||
Version string
|
RemoteWrite11 bool
|
||||||
Timeout model.Duration
|
Timeout model.Duration
|
||||||
HTTPClientConfig config_util.HTTPClientConfig
|
HTTPClientConfig config_util.HTTPClientConfig
|
||||||
SigV4Config *sigv4.SigV4Config
|
SigV4Config *sigv4.SigV4Config
|
||||||
|
@ -128,7 +128,7 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
|
||||||
return &Client{
|
return &Client{
|
||||||
remoteName: name,
|
remoteName: name,
|
||||||
urlString: conf.URL.String(),
|
urlString: conf.URL.String(),
|
||||||
version: conf.Version,
|
remotewrite11: conf.RemoteWrite11,
|
||||||
Client: httpClient,
|
Client: httpClient,
|
||||||
timeout: time.Duration(conf.Timeout),
|
timeout: time.Duration(conf.Timeout),
|
||||||
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
|
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
|
||||||
|
@ -210,7 +210,9 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
|
||||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||||
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
||||||
httpReq.Header.Set("User-Agent", UserAgent)
|
httpReq.Header.Set("User-Agent", UserAgent)
|
||||||
if c.version == "1.1" {
|
|
||||||
|
// Set the right header if we're using v1.1 remote write protocol
|
||||||
|
if c.remotewrite11 {
|
||||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.1") // TODO-RW11: Final value?
|
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.1") // TODO-RW11: Final value?
|
||||||
} else {
|
} else {
|
||||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||||
|
@ -280,11 +282,6 @@ func (c Client) Endpoint() string {
|
||||||
return c.urlString
|
return c.urlString
|
||||||
}
|
}
|
||||||
|
|
||||||
// Version of the remote write client
|
|
||||||
func (c Client) Version() string {
|
|
||||||
return c.version
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read reads from a remote endpoint.
|
// Read reads from a remote endpoint.
|
||||||
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||||
c.readQueries.Inc()
|
c.readQueries.Inc()
|
||||||
|
@ -380,7 +377,3 @@ func (c *TestClient) Name() string {
|
||||||
func (c *TestClient) Endpoint() string {
|
func (c *TestClient) Endpoint() string {
|
||||||
return c.url
|
return c.url
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestClient) Version() string {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
|
@ -386,8 +386,6 @@ type WriteClient interface {
|
||||||
Name() string
|
Name() string
|
||||||
// Endpoint is the remote read or write endpoint for the storage client.
|
// Endpoint is the remote read or write endpoint for the storage client.
|
||||||
Endpoint() string
|
Endpoint() string
|
||||||
// Version is the remote write version for the storage client.
|
|
||||||
Version() string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueManager manages a queue of samples to be sent to the Storage
|
// QueueManager manages a queue of samples to be sent to the Storage
|
||||||
|
@ -1414,8 +1412,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if s.qm.internFormat && s.qm.client().Version() == "1.1" {
|
if s.qm.internFormat {
|
||||||
// the new internFormat bool must be set as well as the client version being 1.1
|
// the new internFormat feature flag is be set
|
||||||
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
s.sendReducedSamples(ctx, pendingReducedData[:n], pool.table, nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||||
|
@ -1433,8 +1431,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
batch := queue.Batch()
|
batch := queue.Batch()
|
||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
if s.qm.internFormat && s.qm.client().Version() == "1.1" {
|
if s.qm.internFormat {
|
||||||
// the new internFormat bool must be set as well as the client version being 1.1
|
// the new internFormat feature flag is set
|
||||||
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
nPendingSamples, nPendingExemplars, nPendingHistograms := populateReducedTimeSeries(pool, batch, pendingReducedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
||||||
|
|
|
@ -167,7 +167,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
|
|
||||||
c, err := NewWriteClient(name, &ClientConfig{
|
c, err := NewWriteClient(name, &ClientConfig{
|
||||||
URL: rwConf.URL,
|
URL: rwConf.URL,
|
||||||
Version: rwConf.Version,
|
RemoteWrite11: rws.writeReducedProto,
|
||||||
Timeout: rwConf.RemoteTimeout,
|
Timeout: rwConf.RemoteTimeout,
|
||||||
HTTPClientConfig: rwConf.HTTPClientConfig,
|
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||||
SigV4Config: rwConf.SigV4Config,
|
SigV4Config: rwConf.SigV4Config,
|
||||||
|
|
Loading…
Reference in a new issue