Exposes remote storage http client to allow for customization ()

* Exposes remote write client to allow for customizations to the http client

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Removed changes to vendored file

Signed-off-by: Joe Elliott <number101010@gmail.com>
This commit is contained in:
Joe Elliott 2020-08-20 10:45:31 -04:00 committed by GitHub
parent 4e6a94a27d
commit 624075eafe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -77,11 +77,11 @@ func init() {
prometheus.MustRegister(remoteReadQueriesTotal, remoteReadQueries, remoteReadQueryDuration) prometheus.MustRegister(remoteReadQueriesTotal, remoteReadQueries, remoteReadQueryDuration)
} }
// client allows reading and writing from/to a remote HTTP endpoint. // Client allows reading and writing from/to a remote HTTP endpoint.
type client struct { type Client struct {
remoteName string // Used to differentiate clients in metrics. remoteName string // Used to differentiate clients in metrics.
url *config_util.URL url *config_util.URL
client *http.Client Client *http.Client
timeout time.Duration timeout time.Duration
readQueries prometheus.Gauge readQueries prometheus.Gauge
@ -109,10 +109,10 @@ func newReadClient(name string, conf *ClientConfig) (ReadClient, error) {
return nil, err return nil, err
} }
return &client{ return &Client{
remoteName: name, remoteName: name,
url: conf.URL, url: conf.URL,
client: httpClient, Client: httpClient,
timeout: time.Duration(conf.Timeout), timeout: time.Duration(conf.Timeout),
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
@ -132,10 +132,10 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
RoundTripper: t, RoundTripper: t,
} }
return &client{ return &Client{
remoteName: name, remoteName: name,
url: conf.URL, url: conf.URL,
client: httpClient, Client: httpClient,
timeout: time.Duration(conf.Timeout), timeout: time.Duration(conf.Timeout),
}, nil }, nil
} }
@ -146,7 +146,7 @@ type RecoverableError struct {
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
// and encoded bytes from codec.go. // and encoded bytes from codec.go.
func (c *client) Store(ctx context.Context, req []byte) error { func (c *Client) Store(ctx context.Context, req []byte) error {
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))
if err != nil { if err != nil {
// Errors from NewRequest are from unparsable URLs, so are not // Errors from NewRequest are from unparsable URLs, so are not
@ -173,9 +173,9 @@ func (c *client) Store(ctx context.Context, req []byte) error {
defer ht.Finish() defer ht.Finish()
} }
httpResp, err := c.client.Do(httpReq) httpResp, err := c.Client.Do(httpReq)
if err != nil { if err != nil {
// Errors from client.Do are from (for example) network errors, so are // Errors from Client.Do are from (for example) network errors, so are
// recoverable. // recoverable.
return RecoverableError{err} return RecoverableError{err}
} }
@ -199,17 +199,17 @@ func (c *client) Store(ctx context.Context, req []byte) error {
} }
// Name uniquely identifies the client. // Name uniquely identifies the client.
func (c client) Name() string { func (c Client) Name() string {
return c.remoteName return c.remoteName
} }
// Endpoint is the remote read or write endpoint. // Endpoint is the remote read or write endpoint.
func (c client) Endpoint() string { func (c Client) Endpoint() string {
return c.url.String() return c.url.String()
} }
// Read reads from a remote endpoint. // Read reads from a remote endpoint.
func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
c.readQueries.Inc() c.readQueries.Inc()
defer c.readQueries.Dec() defer c.readQueries.Dec()
@ -253,7 +253,7 @@ func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
} }
start := time.Now() start := time.Now()
httpResp, err := c.client.Do(httpReq) httpResp, err := c.Client.Do(httpReq)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error sending request") return nil, errors.Wrap(err, "error sending request")
} }