prometheus/storage/remote/client.go
Thibault Chataigner bf4a279a91 Remote storage reads based on oldest timestamp in primary storage (#3129)
Currently all read queries are simply pushed to remote read clients.
This is fine, except for remote storage for wich it unefficient and
make query slower even if remote read is unnecessary.
So we need instead to compare the oldest timestamp in primary/local
storage with the query range lower boundary. If the oldest timestamp
is older than the mint parameter, then there is no need for remote read.
This is an optionnal behavior per remote read client.

Signed-off-by: Thibault Chataigner <t.chataigner@criteo.com>
2017-10-18 12:08:14 +01:00

205 lines
5.5 KiB
Go

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/util/httputil"
)
const maxErrMsgLen = 256
// Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct {
index int // Used to differentiate metrics.
url *config.URL
client *http.Client
timeout time.Duration
readRecent bool
}
type clientConfig struct {
url *config.URL
timeout model.Duration
readRecent bool
httpClientConfig config.HTTPClientConfig
}
// NewClient creates a new Client.
func NewClient(index int, conf *clientConfig) (*Client, error) {
httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig, "remote_storage")
if err != nil {
return nil, err
}
return &Client{
index: index,
url: conf.url,
client: httpClient,
timeout: time.Duration(conf.timeout),
readRecent: conf.readRecent,
}, nil
}
type recoverableError struct {
error
}
// Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(samples model.Samples) error {
req := &prompb.WriteRequest{
Timeseries: make([]*prompb.TimeSeries, 0, len(samples)),
}
for _, s := range samples {
ts := &prompb.TimeSeries{
Labels: make([]*prompb.Label, 0, len(s.Metric)),
}
for k, v := range s.Metric {
ts.Labels = append(ts.Labels,
&prompb.Label{
Name: string(k),
Value: string(v),
})
}
ts.Samples = []*prompb.Sample{
{
Value: float64(s.Value),
Timestamp: int64(s.Timestamp),
},
}
req.Timeseries = append(req.Timeseries, ts)
}
data, err := proto.Marshal(req)
if err != nil {
return err
}
compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
if err != nil {
// Errors from NewRequest are from unparseable URLs, so are not
// recoverable.
return err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
// Errors from client.Do are from (for example) network errors, so are
// recoverable.
return recoverableError{err}
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
}
if httpResp.StatusCode/100 == 5 {
return recoverableError{err}
}
return err
}
// Name identifies the client.
func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url)
}
// Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prompb.LabelMatcher) ([]*prompb.TimeSeries, error) {
req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*prompb.Query{{
StartTimestampMs: from,
EndTimestampMs: through,
Matchers: matchers,
}},
}
data, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to marshal read request: %v", err)
}
compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
if err != nil {
return nil, fmt.Errorf("unable to create request: %v", err)
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status)
}
compressed, err = ioutil.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}
uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}
var resp prompb.ReadResponse
err = proto.Unmarshal(uncompressed, &resp)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal response body: %v", err)
}
if len(resp.Results) != len(req.Queries) {
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
}
return resp.Results[0].Timeseries, nil
}