mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 09:04:06 -08:00
Merge pull request #3336 from tomwilkie/remote-api-update
Update remote storage APIs from 1.8 branch
This commit is contained in:
commit
7aa2930ac5
|
@ -44,26 +44,27 @@ type Client struct {
|
||||||
readRecent bool
|
readRecent bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientConfig struct {
|
// ClientConfig configures a Client.
|
||||||
url *config.URL
|
type ClientConfig struct {
|
||||||
timeout model.Duration
|
URL *config.URL
|
||||||
readRecent bool
|
Timeout model.Duration
|
||||||
httpClientConfig config.HTTPClientConfig
|
ReadRecent bool
|
||||||
|
HTTPClientConfig config.HTTPClientConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates a new Client.
|
// NewClient creates a new Client.
|
||||||
func NewClient(index int, conf *clientConfig) (*Client, error) {
|
func NewClient(index int, conf *ClientConfig) (*Client, error) {
|
||||||
httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig, "remote_storage")
|
httpClient, err := httputil.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
index: index,
|
index: index,
|
||||||
url: conf.url,
|
url: conf.URL,
|
||||||
client: httpClient,
|
client: httpClient,
|
||||||
timeout: time.Duration(conf.timeout),
|
timeout: time.Duration(conf.Timeout),
|
||||||
readRecent: conf.readRecent,
|
readRecent: conf.ReadRecent,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,30 +73,7 @@ type recoverableError struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store sends a batch of samples to the HTTP endpoint.
|
// Store sends a batch of samples to the HTTP endpoint.
|
||||||
func (c *Client) Store(samples model.Samples) error {
|
func (c *Client) Store(req *prompb.WriteRequest) error {
|
||||||
req := &prompb.WriteRequest{
|
|
||||||
Timeseries: make([]*prompb.TimeSeries, 0, len(samples)),
|
|
||||||
}
|
|
||||||
for _, s := range samples {
|
|
||||||
ts := &prompb.TimeSeries{
|
|
||||||
Labels: make([]*prompb.Label, 0, len(s.Metric)),
|
|
||||||
}
|
|
||||||
for k, v := range s.Metric {
|
|
||||||
ts.Labels = append(ts.Labels,
|
|
||||||
&prompb.Label{
|
|
||||||
Name: string(k),
|
|
||||||
Value: string(v),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
ts.Samples = []*prompb.Sample{
|
|
||||||
{
|
|
||||||
Value: float64(s.Value),
|
|
||||||
Timestamp: int64(s.Timestamp),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
req.Timeseries = append(req.Timeseries, ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := proto.Marshal(req)
|
data, err := proto.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -143,17 +121,14 @@ func (c Client) Name() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read reads from a remote endpoint.
|
// Read reads from a remote endpoint.
|
||||||
func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prompb.LabelMatcher) ([]*prompb.TimeSeries, error) {
|
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||||
req := &prompb.ReadRequest{
|
req := &prompb.ReadRequest{
|
||||||
// TODO: Support batching multiple queries into one read request,
|
// TODO: Support batching multiple queries into one read request,
|
||||||
// as the protobuf interface allows for it.
|
// as the protobuf interface allows for it.
|
||||||
Queries: []*prompb.Query{{
|
Queries: []*prompb.Query{
|
||||||
StartTimestampMs: from,
|
query,
|
||||||
EndTimestampMs: through,
|
},
|
||||||
Matchers: matchers,
|
|
||||||
}},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := proto.Marshal(req)
|
data, err := proto.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to marshal read request: %v", err)
|
return nil, fmt.Errorf("unable to marshal read request: %v", err)
|
||||||
|
@ -200,5 +175,5 @@ func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prom
|
||||||
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
|
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.Results[0].Timeseries, nil
|
return resp.Results[0], nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
)
|
)
|
||||||
|
|
||||||
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
|
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
|
||||||
|
@ -64,15 +65,15 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewClient(0, &clientConfig{
|
c, err := NewClient(0, &ClientConfig{
|
||||||
url: &config.URL{serverURL},
|
URL: &config.URL{URL: serverURL},
|
||||||
timeout: model.Duration(time.Second),
|
Timeout: model.Duration(time.Second),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.Store(nil)
|
err = c.Store(&prompb.WriteRequest{})
|
||||||
if !reflect.DeepEqual(err, test.err) {
|
if !reflect.DeepEqual(err, test.err) {
|
||||||
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)
|
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)
|
||||||
}
|
}
|
||||||
|
|
368
storage/remote/codec.go
Normal file
368
storage/remote/codec.go
Normal file
|
@ -0,0 +1,368 @@
|
||||||
|
// Copyright 2017 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package remote
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/golang/snappy"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DecodeReadRequest reads a remote.Request from a http.Request.
|
||||||
|
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) {
|
||||||
|
compressed, err := ioutil.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqBuf, err := snappy.Decode(nil, compressed)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var req prompb.ReadRequest
|
||||||
|
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodeReadResponse writes a remote.Response to a http.ResponseWriter.
|
||||||
|
func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error {
|
||||||
|
data, err := proto.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||||
|
w.Header().Set("Content-Encoding", "snappy")
|
||||||
|
|
||||||
|
compressed := snappy.Encode(nil, data)
|
||||||
|
_, err = w.Write(compressed)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToWriteRequest converts an array of samples into a WriteRequest proto.
|
||||||
|
func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest {
|
||||||
|
req := &prompb.WriteRequest{
|
||||||
|
Timeseries: make([]*prompb.TimeSeries, 0, len(samples)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range samples {
|
||||||
|
ts := prompb.TimeSeries{
|
||||||
|
Labels: MetricToLabelProtos(s.Metric),
|
||||||
|
Samples: []*prompb.Sample{
|
||||||
|
{
|
||||||
|
Value: float64(s.Value),
|
||||||
|
Timestamp: int64(s.Timestamp),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
req.Timeseries = append(req.Timeseries, &ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToQuery builds a Query proto.
|
||||||
|
func ToQuery(from, to int64, matchers []*labels.Matcher) (*prompb.Query, error) {
|
||||||
|
ms, err := toLabelMatchers(matchers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &prompb.Query{
|
||||||
|
StartTimestampMs: from,
|
||||||
|
EndTimestampMs: to,
|
||||||
|
Matchers: ms,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromQuery unpacks a Query proto.
|
||||||
|
func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, error) {
|
||||||
|
matchers, err := fromLabelMatchers(req.Matchers)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
return req.StartTimestampMs, req.EndTimestampMs, matchers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToQueryResult builds a QueryResult proto.
|
||||||
|
func ToQueryResult(ss storage.SeriesSet) (*prompb.QueryResult, error) {
|
||||||
|
resp := &prompb.QueryResult{}
|
||||||
|
for ss.Next() {
|
||||||
|
series := ss.At()
|
||||||
|
iter := series.Iterator()
|
||||||
|
samples := []*prompb.Sample{}
|
||||||
|
|
||||||
|
for iter.Next() {
|
||||||
|
ts, val := iter.At()
|
||||||
|
samples = append(samples, &prompb.Sample{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := iter.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
|
||||||
|
Labels: labelsToLabelsProto(series.Labels()),
|
||||||
|
Samples: samples,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := ss.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromQueryResult unpacks a QueryResult proto.
|
||||||
|
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
|
||||||
|
series := make([]storage.Series, 0, len(res.Timeseries))
|
||||||
|
for _, ts := range res.Timeseries {
|
||||||
|
labels := labelProtosToLabels(ts.Labels)
|
||||||
|
if err := validateLabelsAndMetricName(labels); err != nil {
|
||||||
|
return errSeriesSet{err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
series = append(series, &concreteSeries{
|
||||||
|
labels: labels,
|
||||||
|
samples: ts.Samples,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Sort(byLabel(series))
|
||||||
|
return &concreteSeriesSet{
|
||||||
|
series: series,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// errSeriesSet implements storage.SeriesSet, just returning an error.
|
||||||
|
type errSeriesSet struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (errSeriesSet) Next() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (errSeriesSet) At() storage.Series {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errSeriesSet) Err() error {
|
||||||
|
return e.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// concreteSeriesSet implements storage.SeriesSet.
|
||||||
|
type concreteSeriesSet struct {
|
||||||
|
cur int
|
||||||
|
series []storage.Series
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concreteSeriesSet) Next() bool {
|
||||||
|
c.cur++
|
||||||
|
return c.cur-1 < len(c.series)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concreteSeriesSet) At() storage.Series {
|
||||||
|
return c.series[c.cur-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concreteSeriesSet) Err() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// concreteSeries implementes storage.Series.
|
||||||
|
type concreteSeries struct {
|
||||||
|
labels labels.Labels
|
||||||
|
samples []*prompb.Sample
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concreteSeries) Labels() labels.Labels {
|
||||||
|
return c.labels
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concreteSeries) Iterator() storage.SeriesIterator {
|
||||||
|
return newConcreteSeriersIterator(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// concreteSeriesIterator implements storage.SeriesIterator.
|
||||||
|
type concreteSeriesIterator struct {
|
||||||
|
cur int
|
||||||
|
series *concreteSeries
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator {
|
||||||
|
return &concreteSeriesIterator{
|
||||||
|
cur: -1,
|
||||||
|
series: series,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek implements storage.SeriesIterator.
|
||||||
|
func (c *concreteSeriesIterator) Seek(t int64) bool {
|
||||||
|
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
|
||||||
|
return c.series.samples[n].Timestamp >= t
|
||||||
|
})
|
||||||
|
return c.cur < len(c.series.samples)
|
||||||
|
}
|
||||||
|
|
||||||
|
// At implements storage.SeriesIterator.
|
||||||
|
func (c *concreteSeriesIterator) At() (t int64, v float64) {
|
||||||
|
s := c.series.samples[c.cur]
|
||||||
|
return s.Timestamp, s.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next implements storage.SeriesIterator.
|
||||||
|
func (c *concreteSeriesIterator) Next() bool {
|
||||||
|
c.cur++
|
||||||
|
return c.cur < len(c.series.samples)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err implements storage.SeriesIterator.
|
||||||
|
func (c *concreteSeriesIterator) Err() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read.
|
||||||
|
func validateLabelsAndMetricName(ls labels.Labels) error {
|
||||||
|
for _, l := range ls {
|
||||||
|
if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) {
|
||||||
|
return fmt.Errorf("Invalid metric name: %v", l.Value)
|
||||||
|
}
|
||||||
|
if !model.LabelName(l.Name).IsValid() {
|
||||||
|
return fmt.Errorf("Invalid label name: %v", l.Name)
|
||||||
|
}
|
||||||
|
if !model.LabelValue(l.Value).IsValid() {
|
||||||
|
return fmt.Errorf("Invalid label value: %v", l.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
|
||||||
|
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
|
||||||
|
for _, m := range matchers {
|
||||||
|
var mType prompb.LabelMatcher_Type
|
||||||
|
switch m.Type {
|
||||||
|
case labels.MatchEqual:
|
||||||
|
mType = prompb.LabelMatcher_EQ
|
||||||
|
case labels.MatchNotEqual:
|
||||||
|
mType = prompb.LabelMatcher_NEQ
|
||||||
|
case labels.MatchRegexp:
|
||||||
|
mType = prompb.LabelMatcher_RE
|
||||||
|
case labels.MatchNotRegexp:
|
||||||
|
mType = prompb.LabelMatcher_NRE
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid matcher type")
|
||||||
|
}
|
||||||
|
pbMatchers = append(pbMatchers, &prompb.LabelMatcher{
|
||||||
|
Type: mType,
|
||||||
|
Name: m.Name,
|
||||||
|
Value: m.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return pbMatchers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) {
|
||||||
|
result := make([]*labels.Matcher, 0, len(matchers))
|
||||||
|
for _, matcher := range matchers {
|
||||||
|
var mtype labels.MatchType
|
||||||
|
switch matcher.Type {
|
||||||
|
case prompb.LabelMatcher_EQ:
|
||||||
|
mtype = labels.MatchEqual
|
||||||
|
case prompb.LabelMatcher_NEQ:
|
||||||
|
mtype = labels.MatchNotEqual
|
||||||
|
case prompb.LabelMatcher_RE:
|
||||||
|
mtype = labels.MatchRegexp
|
||||||
|
case prompb.LabelMatcher_NRE:
|
||||||
|
mtype = labels.MatchNotRegexp
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid matcher type")
|
||||||
|
}
|
||||||
|
matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, matcher)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricToLabelProtos builds a []*prompb.Label from a model.Metric
|
||||||
|
func MetricToLabelProtos(metric model.Metric) []*prompb.Label {
|
||||||
|
labels := make([]*prompb.Label, 0, len(metric))
|
||||||
|
for k, v := range metric {
|
||||||
|
labels = append(labels, &prompb.Label{
|
||||||
|
Name: string(k),
|
||||||
|
Value: string(v),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Slice(labels, func(i int, j int) bool {
|
||||||
|
return labels[i].Name < labels[j].Name
|
||||||
|
})
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
|
||||||
|
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
||||||
|
metric := make(model.Metric, len(labelPairs))
|
||||||
|
for _, l := range labelPairs {
|
||||||
|
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
|
||||||
|
}
|
||||||
|
return metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels {
|
||||||
|
result := make(labels.Labels, 0, len(labelPairs))
|
||||||
|
for _, l := range labelPairs {
|
||||||
|
result = append(result, labels.Label{
|
||||||
|
Name: l.Name,
|
||||||
|
Value: l.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Sort(result)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelsToLabelsProto(labels labels.Labels) []*prompb.Label {
|
||||||
|
result := make([]*prompb.Label, 0, len(labels))
|
||||||
|
for _, l := range labels {
|
||||||
|
result = append(result, &prompb.Label{
|
||||||
|
Name: l.Name,
|
||||||
|
Value: l.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelsToMetric(ls labels.Labels) model.Metric {
|
||||||
|
metric := make(model.Metric, len(ls))
|
||||||
|
for _, l := range ls {
|
||||||
|
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
|
||||||
|
}
|
||||||
|
return metric
|
||||||
|
}
|
126
storage/remote/codec_test.go
Normal file
126
storage/remote/codec_test.go
Normal file
|
@ -0,0 +1,126 @@
|
||||||
|
// Copyright 2017 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package remote
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestValidateLabelsAndMetricName(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input labels.Labels
|
||||||
|
expectedErr string
|
||||||
|
shouldPass bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "name",
|
||||||
|
"labelName", "labelValue",
|
||||||
|
),
|
||||||
|
expectedErr: "",
|
||||||
|
shouldPass: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "name",
|
||||||
|
"_labelName", "labelValue",
|
||||||
|
),
|
||||||
|
expectedErr: "",
|
||||||
|
shouldPass: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "name",
|
||||||
|
"@labelName", "labelValue",
|
||||||
|
),
|
||||||
|
expectedErr: "Invalid label name: @labelName",
|
||||||
|
shouldPass: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "name",
|
||||||
|
"123labelName", "labelValue",
|
||||||
|
),
|
||||||
|
expectedErr: "Invalid label name: 123labelName",
|
||||||
|
shouldPass: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "name",
|
||||||
|
"", "labelValue",
|
||||||
|
),
|
||||||
|
expectedErr: "Invalid label name: ",
|
||||||
|
shouldPass: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "name",
|
||||||
|
"labelName", string([]byte{0xff}),
|
||||||
|
),
|
||||||
|
expectedErr: "Invalid label value: " + string([]byte{0xff}),
|
||||||
|
shouldPass: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: labels.FromStrings(
|
||||||
|
"__name__", "@invalid_name",
|
||||||
|
),
|
||||||
|
expectedErr: "Invalid metric name: @invalid_name",
|
||||||
|
shouldPass: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
err := validateLabelsAndMetricName(test.input)
|
||||||
|
if test.shouldPass != (err == nil) {
|
||||||
|
if test.shouldPass {
|
||||||
|
t.Fatalf("Test should pass, got unexpected error: %v", err)
|
||||||
|
} else {
|
||||||
|
t.Fatalf("Test should fail, unexpected error, got: %v, expected: %v", err, test.expectedErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcreteSeriesSet(t *testing.T) {
|
||||||
|
series1 := &concreteSeries{
|
||||||
|
labels: labels.FromStrings("foo", "bar"),
|
||||||
|
samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}},
|
||||||
|
}
|
||||||
|
series2 := &concreteSeries{
|
||||||
|
labels: labels.FromStrings("foo", "baz"),
|
||||||
|
samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}},
|
||||||
|
}
|
||||||
|
c := &concreteSeriesSet{
|
||||||
|
series: []storage.Series{series1, series2},
|
||||||
|
}
|
||||||
|
if !c.Next() {
|
||||||
|
t.Fatalf("Expected Next() to be true.")
|
||||||
|
}
|
||||||
|
if c.At() != series1 {
|
||||||
|
t.Fatalf("Unexpected series returned.")
|
||||||
|
}
|
||||||
|
if !c.Next() {
|
||||||
|
t.Fatalf("Expected Next() to be true.")
|
||||||
|
}
|
||||||
|
if c.At() != series2 {
|
||||||
|
t.Fatalf("Unexpected series returned.")
|
||||||
|
}
|
||||||
|
if c.Next() {
|
||||||
|
t.Fatalf("Expected Next() to be false.")
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/relabel"
|
"github.com/prometheus/prometheus/relabel"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -124,47 +125,11 @@ func init() {
|
||||||
prometheus.MustRegister(numShards)
|
prometheus.MustRegister(numShards)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueManagerConfig is the configuration for the queue used to write to remote
|
|
||||||
// storage.
|
|
||||||
type QueueManagerConfig struct {
|
|
||||||
// Number of samples to buffer per shard before we start dropping them.
|
|
||||||
QueueCapacity int
|
|
||||||
// Max number of shards, i.e. amount of concurrency.
|
|
||||||
MaxShards int
|
|
||||||
// Maximum number of samples per send.
|
|
||||||
MaxSamplesPerSend int
|
|
||||||
// Maximum time sample will wait in buffer.
|
|
||||||
BatchSendDeadline time.Duration
|
|
||||||
// Max number of times to retry a batch on recoverable errors.
|
|
||||||
MaxRetries int
|
|
||||||
// On recoverable errors, backoff exponentially.
|
|
||||||
MinBackoff time.Duration
|
|
||||||
MaxBackoff time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// defaultQueueManagerConfig is the default remote queue configuration.
|
|
||||||
var defaultQueueManagerConfig = QueueManagerConfig{
|
|
||||||
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
|
||||||
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
|
||||||
MaxShards: 1000,
|
|
||||||
MaxSamplesPerSend: 100,
|
|
||||||
|
|
||||||
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
|
||||||
// 1000 shards, this will buffer 100M samples total.
|
|
||||||
QueueCapacity: 100 * 1000,
|
|
||||||
BatchSendDeadline: 5 * time.Second,
|
|
||||||
|
|
||||||
// Max number of times to retry a batch on recoverable errors.
|
|
||||||
MaxRetries: 10,
|
|
||||||
MinBackoff: 30 * time.Millisecond,
|
|
||||||
MaxBackoff: 100 * time.Millisecond,
|
|
||||||
}
|
|
||||||
|
|
||||||
// StorageClient defines an interface for sending a batch of samples to an
|
// StorageClient defines an interface for sending a batch of samples to an
|
||||||
// external timeseries database.
|
// external timeseries database.
|
||||||
type StorageClient interface {
|
type StorageClient interface {
|
||||||
// Store stores the given samples in the remote storage.
|
// Store stores the given samples in the remote storage.
|
||||||
Store(model.Samples) error
|
Store(*prompb.WriteRequest) error
|
||||||
// Name identifies the remote storage implementation.
|
// Name identifies the remote storage implementation.
|
||||||
Name() string
|
Name() string
|
||||||
}
|
}
|
||||||
|
@ -174,7 +139,7 @@ type StorageClient interface {
|
||||||
type QueueManager struct {
|
type QueueManager struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
|
||||||
cfg QueueManagerConfig
|
cfg config.QueueConfig
|
||||||
externalLabels model.LabelSet
|
externalLabels model.LabelSet
|
||||||
relabelConfigs []*config.RelabelConfig
|
relabelConfigs []*config.RelabelConfig
|
||||||
client StorageClient
|
client StorageClient
|
||||||
|
@ -193,7 +158,7 @@ type QueueManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueManager builds a new QueueManager.
|
// NewQueueManager builds a new QueueManager.
|
||||||
func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -216,7 +181,13 @@ func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels m
|
||||||
}
|
}
|
||||||
t.shards = t.newShards(t.numShards)
|
t.shards = t.newShards(t.numShards)
|
||||||
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
|
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
|
||||||
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
|
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity))
|
||||||
|
|
||||||
|
// Initialise counter labels to zero.
|
||||||
|
sentBatchDuration.WithLabelValues(t.queueName)
|
||||||
|
succeededSamplesTotal.WithLabelValues(t.queueName)
|
||||||
|
failedSamplesTotal.WithLabelValues(t.queueName)
|
||||||
|
droppedSamplesTotal.WithLabelValues(t.queueName)
|
||||||
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
@ -408,7 +379,7 @@ type shards struct {
|
||||||
func (t *QueueManager) newShards(numShards int) *shards {
|
func (t *QueueManager) newShards(numShards int) *shards {
|
||||||
queues := make([]chan *model.Sample, numShards)
|
queues := make([]chan *model.Sample, numShards)
|
||||||
for i := 0; i < numShards; i++ {
|
for i := 0; i < numShards; i++ {
|
||||||
queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity)
|
queues[i] = make(chan *model.Sample, t.cfg.Capacity)
|
||||||
}
|
}
|
||||||
s := &shards{
|
s := &shards{
|
||||||
qm: t,
|
qm: t,
|
||||||
|
@ -502,7 +473,8 @@ func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
|
||||||
backoff := s.qm.cfg.MinBackoff
|
backoff := s.qm.cfg.MinBackoff
|
||||||
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
|
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := s.qm.client.Store(samples)
|
req := ToWriteRequest(samples)
|
||||||
|
err := s.qm.client.Store(req)
|
||||||
|
|
||||||
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())
|
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -15,25 +15,28 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestStorageClient struct {
|
type TestStorageClient struct {
|
||||||
receivedSamples map[string]model.Samples
|
receivedSamples map[string][]*prompb.Sample
|
||||||
expectedSamples map[string]model.Samples
|
expectedSamples map[string][]*prompb.Sample
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestStorageClient() *TestStorageClient {
|
func NewTestStorageClient() *TestStorageClient {
|
||||||
return &TestStorageClient{
|
return &TestStorageClient{
|
||||||
receivedSamples: map[string]model.Samples{},
|
receivedSamples: map[string][]*prompb.Sample{},
|
||||||
expectedSamples: map[string]model.Samples{},
|
expectedSamples: map[string][]*prompb.Sample{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,8 +45,11 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) {
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
for _, s := range ss {
|
for _, s := range ss {
|
||||||
ts := s.Metric.String()
|
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
|
||||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], s)
|
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
|
||||||
|
Timestamp: int64(s.Timestamp),
|
||||||
|
Value: float64(s.Value),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
c.wg.Add(len(ss))
|
c.wg.Add(len(ss))
|
||||||
}
|
}
|
||||||
|
@ -54,23 +60,24 @@ func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
for ts, expectedSamples := range c.expectedSamples {
|
for ts, expectedSamples := range c.expectedSamples {
|
||||||
for i, expected := range expectedSamples {
|
if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) {
|
||||||
if !expected.Equal(c.receivedSamples[ts][i]) {
|
t.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts])
|
||||||
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestStorageClient) Store(ss model.Samples) error {
|
func (c *TestStorageClient) Store(req *prompb.WriteRequest) error {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
count := 0
|
||||||
for _, s := range ss {
|
for _, ts := range req.Timeseries {
|
||||||
ts := s.Metric.String()
|
labels := labelProtosToLabels(ts.Labels).String()
|
||||||
c.receivedSamples[ts] = append(c.receivedSamples[ts], s)
|
for _, sample := range ts.Samples {
|
||||||
|
count++
|
||||||
|
c.receivedSamples[labels] = append(c.receivedSamples[labels], sample)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
c.wg.Add(-len(ss))
|
c.wg.Add(-count)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +88,7 @@ func (c *TestStorageClient) Name() string {
|
||||||
func TestSampleDelivery(t *testing.T) {
|
func TestSampleDelivery(t *testing.T) {
|
||||||
// Let's create an even number of send batches so we don't run into the
|
// Let's create an even number of send batches so we don't run into the
|
||||||
// batch timeout case.
|
// batch timeout case.
|
||||||
n := defaultQueueManagerConfig.QueueCapacity * 2
|
n := config.DefaultQueueConfig.Capacity * 2
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -97,7 +104,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
c.expectSamples(samples[:len(samples)/2])
|
c.expectSamples(samples[:len(samples)/2])
|
||||||
|
|
||||||
cfg := defaultQueueManagerConfig
|
cfg := config.DefaultQueueConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
m := NewQueueManager(nil, cfg, nil, nil, c)
|
m := NewQueueManager(nil, cfg, nil, nil, c)
|
||||||
|
|
||||||
|
@ -117,7 +124,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
|
|
||||||
func TestSampleDeliveryOrder(t *testing.T) {
|
func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
ts := 10
|
ts := 10
|
||||||
n := defaultQueueManagerConfig.MaxSamplesPerSend * ts
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -133,7 +140,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
|
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
c.expectSamples(samples)
|
c.expectSamples(samples)
|
||||||
m := NewQueueManager(nil, defaultQueueManagerConfig, nil, nil, c)
|
m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
|
@ -161,7 +168,7 @@ func NewTestBlockedStorageClient() *TestBlockingStorageClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingStorageClient) Store(s model.Samples) error {
|
func (c *TestBlockingStorageClient) Store(_ *prompb.WriteRequest) error {
|
||||||
atomic.AddUint64(&c.numCalls, 1)
|
atomic.AddUint64(&c.numCalls, 1)
|
||||||
<-c.block
|
<-c.block
|
||||||
return nil
|
return nil
|
||||||
|
@ -194,7 +201,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||||
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
||||||
// should be left on the queue.
|
// should be left on the queue.
|
||||||
n := defaultQueueManagerConfig.MaxSamplesPerSend * 2
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * 2
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -208,9 +215,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewTestBlockedStorageClient()
|
c := NewTestBlockedStorageClient()
|
||||||
cfg := defaultQueueManagerConfig
|
cfg := config.DefaultQueueConfig
|
||||||
cfg.MaxShards = 1
|
cfg.MaxShards = 1
|
||||||
cfg.QueueCapacity = n
|
cfg.Capacity = n
|
||||||
m := NewQueueManager(nil, cfg, nil, nil, c)
|
m := NewQueueManager(nil, cfg, nil, nil, c)
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
@ -240,7 +247,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend {
|
if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend {
|
||||||
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
||||||
m.queueLen(),
|
m.queueLen(),
|
||||||
)
|
)
|
||||||
|
|
|
@ -15,17 +15,14 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Querier returns a new Querier on the storage.
|
// Querier returns a new Querier on the storage.
|
||||||
func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -47,6 +44,7 @@ func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queriers = append(queriers, &querier{
|
queriers = append(queriers, &querier{
|
||||||
|
ctx: ctx,
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: cmaxt,
|
maxt: cmaxt,
|
||||||
client: c,
|
client: c,
|
||||||
|
@ -61,6 +59,7 @@ var newMergeQueriers = storage.NewMergeQuerier
|
||||||
|
|
||||||
// Querier is an adapter to make a Client usable as a storage.Querier.
|
// Querier is an adapter to make a Client usable as a storage.Querier.
|
||||||
type querier struct {
|
type querier struct {
|
||||||
|
ctx context.Context
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
client *Client
|
client *Client
|
||||||
externalLabels model.LabelSet
|
externalLabels model.LabelSet
|
||||||
|
@ -70,61 +69,19 @@ type querier struct {
|
||||||
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
||||||
m, added := q.addExternalLabels(matchers)
|
m, added := q.addExternalLabels(matchers)
|
||||||
|
|
||||||
res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m))
|
query, err := ToQuery(q.mint, q.maxt, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errSeriesSet{err: err}
|
return errSeriesSet{err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
series := make([]storage.Series, 0, len(res))
|
res, err := q.client.Read(q.ctx, query)
|
||||||
for _, ts := range res {
|
if err != nil {
|
||||||
labels := labelPairsToLabels(ts.Labels)
|
return errSeriesSet{err: err}
|
||||||
removeLabels(&labels, added)
|
|
||||||
series = append(series, &concreteSeries{
|
|
||||||
labels: labels,
|
|
||||||
samples: ts.Samples,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
sort.Sort(byLabel(series))
|
|
||||||
return &concreteSeriesSet{
|
|
||||||
series: series,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func labelMatchersToProto(matchers []*labels.Matcher) []*prompb.LabelMatcher {
|
seriesSet := FromQueryResult(res)
|
||||||
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
|
|
||||||
for _, m := range matchers {
|
|
||||||
var mType prompb.LabelMatcher_Type
|
|
||||||
switch m.Type {
|
|
||||||
case labels.MatchEqual:
|
|
||||||
mType = prompb.LabelMatcher_EQ
|
|
||||||
case labels.MatchNotEqual:
|
|
||||||
mType = prompb.LabelMatcher_NEQ
|
|
||||||
case labels.MatchRegexp:
|
|
||||||
mType = prompb.LabelMatcher_RE
|
|
||||||
case labels.MatchNotRegexp:
|
|
||||||
mType = prompb.LabelMatcher_NRE
|
|
||||||
default:
|
|
||||||
panic("invalid matcher type")
|
|
||||||
}
|
|
||||||
pbMatchers = append(pbMatchers, &prompb.LabelMatcher{
|
|
||||||
Type: mType,
|
|
||||||
Name: string(m.Name),
|
|
||||||
Value: string(m.Value),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return pbMatchers
|
|
||||||
}
|
|
||||||
|
|
||||||
func labelPairsToLabels(labelPairs []*prompb.Label) labels.Labels {
|
return newSeriesSetFilter(seriesSet, added)
|
||||||
result := make(labels.Labels, 0, len(labelPairs))
|
|
||||||
for _, l := range labelPairs {
|
|
||||||
result = append(result, labels.Label{
|
|
||||||
Name: l.Name,
|
|
||||||
Value: l.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
sort.Sort(result)
|
|
||||||
return result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type byLabel []storage.Series
|
type byLabel []storage.Series
|
||||||
|
@ -144,94 +101,6 @@ func (q *querier) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// errSeriesSet implements storage.SeriesSet, just returning an error.
|
|
||||||
type errSeriesSet struct {
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (errSeriesSet) Next() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (errSeriesSet) At() storage.Series {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e errSeriesSet) Err() error {
|
|
||||||
return e.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// concreteSeriesSet implements storage.SeriesSet.
|
|
||||||
type concreteSeriesSet struct {
|
|
||||||
cur int
|
|
||||||
series []storage.Series
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *concreteSeriesSet) Next() bool {
|
|
||||||
c.cur++
|
|
||||||
return c.cur-1 < len(c.series)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *concreteSeriesSet) At() storage.Series {
|
|
||||||
return c.series[c.cur-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *concreteSeriesSet) Err() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// concreteSeries implementes storage.Series.
|
|
||||||
type concreteSeries struct {
|
|
||||||
labels labels.Labels
|
|
||||||
samples []*prompb.Sample
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *concreteSeries) Labels() labels.Labels {
|
|
||||||
return c.labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *concreteSeries) Iterator() storage.SeriesIterator {
|
|
||||||
return newConcreteSeriersIterator(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
// concreteSeriesIterator implements storage.SeriesIterator.
|
|
||||||
type concreteSeriesIterator struct {
|
|
||||||
cur int
|
|
||||||
series *concreteSeries
|
|
||||||
}
|
|
||||||
|
|
||||||
func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator {
|
|
||||||
return &concreteSeriesIterator{
|
|
||||||
cur: -1,
|
|
||||||
series: series,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Seek implements storage.SeriesIterator.
|
|
||||||
func (c *concreteSeriesIterator) Seek(t int64) bool {
|
|
||||||
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
|
|
||||||
return c.series.samples[n].Timestamp >= t
|
|
||||||
})
|
|
||||||
return c.cur < len(c.series.samples)
|
|
||||||
}
|
|
||||||
|
|
||||||
// At implements storage.SeriesIterator.
|
|
||||||
func (c *concreteSeriesIterator) At() (t int64, v float64) {
|
|
||||||
s := c.series.samples[c.cur]
|
|
||||||
return s.Timestamp, s.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next implements storage.SeriesIterator.
|
|
||||||
func (c *concreteSeriesIterator) Next() bool {
|
|
||||||
c.cur++
|
|
||||||
return c.cur < len(c.series.samples)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Err implements storage.SeriesIterator.
|
|
||||||
func (c *concreteSeriesIterator) Err() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// addExternalLabels adds matchers for each external label. External labels
|
// addExternalLabels adds matchers for each external label. External labels
|
||||||
// that already have a corresponding user-supplied matcher are skipped, as we
|
// that already have a corresponding user-supplied matcher are skipped, as we
|
||||||
// assume that the user explicitly wants to select a different value for them.
|
// assume that the user explicitly wants to select a different value for them.
|
||||||
|
@ -258,12 +127,38 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match
|
||||||
return matchers, el
|
return matchers, el
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeLabels(l *labels.Labels, toDelete model.LabelSet) {
|
func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet {
|
||||||
for i := 0; i < len(*l); {
|
return &seriesSetFilter{
|
||||||
if _, ok := toDelete[model.LabelName((*l)[i].Name)]; ok {
|
SeriesSet: ss,
|
||||||
*l = (*l)[:i+copy((*l)[i:], (*l)[i+1:])]
|
toFilter: toFilter,
|
||||||
} else {
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type seriesSetFilter struct {
|
||||||
|
storage.SeriesSet
|
||||||
|
toFilter model.LabelSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ssf seriesSetFilter) At() storage.Series {
|
||||||
|
return seriesFilter{
|
||||||
|
Series: ssf.SeriesSet.At(),
|
||||||
|
toFilter: ssf.toFilter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type seriesFilter struct {
|
||||||
|
storage.Series
|
||||||
|
toFilter model.LabelSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sf seriesFilter) Labels() labels.Labels {
|
||||||
|
labels := sf.Series.Labels()
|
||||||
|
for i := 0; i < len(labels); {
|
||||||
|
if _, ok := sf.toFilter[model.LabelName(labels[i].Name)]; ok {
|
||||||
|
labels = labels[:i+copy(labels[i:], labels[i+1:])]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
|
@ -98,55 +98,38 @@ func TestAddExternalLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoveLabels(t *testing.T) {
|
func TestSeriesSetFilter(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
in labels.Labels
|
in *prompb.QueryResult
|
||||||
out labels.Labels
|
|
||||||
toRemove model.LabelSet
|
toRemove model.LabelSet
|
||||||
|
|
||||||
|
expected *prompb.QueryResult
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
toRemove: model.LabelSet{"foo": "bar"},
|
toRemove: model.LabelSet{"foo": "bar"},
|
||||||
in: labels.FromStrings("foo", "bar", "a", "b"),
|
in: &prompb.QueryResult{
|
||||||
out: labels.FromStrings("a", "b"),
|
Timeseries: []*prompb.TimeSeries{
|
||||||
|
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []*prompb.Sample{}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: &prompb.QueryResult{
|
||||||
|
Timeseries: []*prompb.TimeSeries{
|
||||||
|
{Labels: labelsToLabelsProto(labels.FromStrings("a", "b")), Samples: []*prompb.Sample{}},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, test := range tests {
|
for i, tc := range tests {
|
||||||
in := test.in.Copy()
|
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove)
|
||||||
removeLabels(&in, test.toRemove)
|
have, err := ToQueryResult(filtered)
|
||||||
|
if err != nil {
|
||||||
if !reflect.DeepEqual(in, test.out) {
|
t.Fatal(err)
|
||||||
t.Fatalf("%d. unexpected labels; want %v, got %v", i, test.out, in)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConcreteSeriesSet(t *testing.T) {
|
if !reflect.DeepEqual(have, tc.expected) {
|
||||||
series1 := &concreteSeries{
|
t.Fatalf("%d. unexpected labels; want %v, got %v", i, tc.expected, have)
|
||||||
labels: labels.FromStrings("foo", "bar"),
|
}
|
||||||
samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}},
|
|
||||||
}
|
|
||||||
series2 := &concreteSeries{
|
|
||||||
labels: labels.FromStrings("foo", "baz"),
|
|
||||||
samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}},
|
|
||||||
}
|
|
||||||
c := &concreteSeriesSet{
|
|
||||||
series: []storage.Series{series1, series2},
|
|
||||||
}
|
|
||||||
if !c.Next() {
|
|
||||||
t.Fatalf("Expected Next() to be true.")
|
|
||||||
}
|
|
||||||
if c.At() != series1 {
|
|
||||||
t.Fatalf("Unexpected series returned.")
|
|
||||||
}
|
|
||||||
if !c.Next() {
|
|
||||||
t.Fatalf("Expected Next() to be true.")
|
|
||||||
}
|
|
||||||
if c.At() != series2 {
|
|
||||||
t.Fatalf("Unexpected series returned.")
|
|
||||||
}
|
|
||||||
if c.Next() {
|
|
||||||
t.Fatalf("Expected Next() to be false.")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,11 +167,11 @@ func TestRemoteStorageQuerier(t *testing.T) {
|
||||||
s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil })
|
s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil })
|
||||||
s.clients = []*Client{}
|
s.clients = []*Client{}
|
||||||
for _, readRecent := range test.readRecentClients {
|
for _, readRecent := range test.readRecentClients {
|
||||||
c, _ := NewClient(0, &clientConfig{
|
c, _ := NewClient(0, &ClientConfig{
|
||||||
url: nil,
|
URL: nil,
|
||||||
timeout: model.Duration(30 * time.Second),
|
Timeout: model.Duration(30 * time.Second),
|
||||||
httpClientConfig: config.HTTPClientConfig{},
|
HTTPClientConfig: config.HTTPClientConfig{},
|
||||||
readRecent: readRecent,
|
ReadRecent: readRecent,
|
||||||
})
|
})
|
||||||
s.clients = append(s.clients, c)
|
s.clients = append(s.clients, c)
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,17 +58,17 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
||||||
// TODO: we should only stop & recreate queues which have changes,
|
// TODO: we should only stop & recreate queues which have changes,
|
||||||
// as this can be quite disruptive.
|
// as this can be quite disruptive.
|
||||||
for i, rwConf := range conf.RemoteWriteConfigs {
|
for i, rwConf := range conf.RemoteWriteConfigs {
|
||||||
c, err := NewClient(i, &clientConfig{
|
c, err := NewClient(i, &ClientConfig{
|
||||||
url: rwConf.URL,
|
URL: rwConf.URL,
|
||||||
timeout: rwConf.RemoteTimeout,
|
Timeout: rwConf.RemoteTimeout,
|
||||||
httpClientConfig: rwConf.HTTPClientConfig,
|
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newQueues = append(newQueues, NewQueueManager(
|
newQueues = append(newQueues, NewQueueManager(
|
||||||
s.logger,
|
s.logger,
|
||||||
defaultQueueManagerConfig,
|
config.DefaultQueueConfig,
|
||||||
conf.GlobalConfig.ExternalLabels,
|
conf.GlobalConfig.ExternalLabels,
|
||||||
rwConf.WriteRelabelConfigs,
|
rwConf.WriteRelabelConfigs,
|
||||||
c,
|
c,
|
||||||
|
@ -88,11 +88,11 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
||||||
|
|
||||||
clients := []*Client{}
|
clients := []*Client{}
|
||||||
for i, rrConf := range conf.RemoteReadConfigs {
|
for i, rrConf := range conf.RemoteReadConfigs {
|
||||||
c, err := NewClient(i, &clientConfig{
|
c, err := NewClient(i, &ClientConfig{
|
||||||
url: rrConf.URL,
|
URL: rrConf.URL,
|
||||||
timeout: rrConf.RemoteTimeout,
|
Timeout: rrConf.RemoteTimeout,
|
||||||
httpClientConfig: rrConf.HTTPClientConfig,
|
HTTPClientConfig: rrConf.HTTPClientConfig,
|
||||||
readRecent: rrConf.ReadRecent,
|
ReadRecent: rrConf.ReadRecent,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -38,14 +38,6 @@ func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelsToMetric(ls labels.Labels) model.Metric {
|
|
||||||
metric := make(model.Metric, len(ls))
|
|
||||||
for _, l := range ls {
|
|
||||||
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
|
|
||||||
}
|
|
||||||
return metric
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddFast implements storage.Appender.
|
// AddFast implements storage.Appender.
|
||||||
func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error {
|
func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error {
|
||||||
_, err := s.Add(l, t, v)
|
_, err := s.Add(l, t, v)
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -31,9 +32,11 @@ import (
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/retrieval"
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
"github.com/prometheus/prometheus/util/httputil"
|
"github.com/prometheus/prometheus/util/httputil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -161,6 +164,7 @@ func (api *API) Register(r *route.Router) {
|
||||||
r.Get("/alertmanagers", instr("alertmanagers", api.alertmanagers))
|
r.Get("/alertmanagers", instr("alertmanagers", api.alertmanagers))
|
||||||
|
|
||||||
r.Get("/status/config", instr("config", api.serveConfig))
|
r.Get("/status/config", instr("config", api.serveConfig))
|
||||||
|
r.Post("/read", api.ready(prometheus.InstrumentHandler("read", http.HandlerFunc(api.remoteRead))))
|
||||||
}
|
}
|
||||||
|
|
||||||
type queryData struct {
|
type queryData struct {
|
||||||
|
@ -451,6 +455,105 @@ func (api *API) serveConfig(r *http.Request) (interface{}, *apiError) {
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
|
req, err := remote.DecodeReadRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := prompb.ReadResponse{
|
||||||
|
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||||
|
}
|
||||||
|
for i, query := range req.Queries {
|
||||||
|
from, through, matchers, err := remote.FromQuery(query)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
querier, err := api.Queryable.Querier(r.Context(), from, through)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer querier.Close()
|
||||||
|
|
||||||
|
// Change equality matchers which match external labels
|
||||||
|
// to a matcher that looks for an empty label,
|
||||||
|
// as that label should not be present in the storage.
|
||||||
|
externalLabels := api.config().GlobalConfig.ExternalLabels.Clone()
|
||||||
|
filteredMatchers := make([]*labels.Matcher, 0, len(matchers))
|
||||||
|
for _, m := range matchers {
|
||||||
|
value := externalLabels[model.LabelName(m.Name)]
|
||||||
|
if m.Type == labels.MatchEqual && value == model.LabelValue(m.Value) {
|
||||||
|
matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "")
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filteredMatchers = append(filteredMatchers, matcher)
|
||||||
|
} else {
|
||||||
|
filteredMatchers = append(filteredMatchers, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...))
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add external labels back in, in sorted order.
|
||||||
|
sortedExternalLabels := make([]*prompb.Label, 0, len(externalLabels))
|
||||||
|
for name, value := range externalLabels {
|
||||||
|
sortedExternalLabels = append(sortedExternalLabels, &prompb.Label{
|
||||||
|
Name: string(name),
|
||||||
|
Value: string(value),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Slice(sortedExternalLabels, func(i, j int) bool {
|
||||||
|
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, ts := range resp.Results[i].Timeseries {
|
||||||
|
ts.Labels = mergeLabels(ts.Labels, sortedExternalLabels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := remote.EncodeReadResponse(&resp, w); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeLabels merges two sets of sorted proto labels, preferring those in
|
||||||
|
// primary to those in secondary when there is an overlap.
|
||||||
|
func mergeLabels(primary, secondary []*prompb.Label) []*prompb.Label {
|
||||||
|
result := make([]*prompb.Label, 0, len(primary)+len(secondary))
|
||||||
|
i, j := 0, 0
|
||||||
|
for i < len(primary) && j < len(secondary) {
|
||||||
|
if primary[i].Name < secondary[j].Name {
|
||||||
|
result = append(result, primary[i])
|
||||||
|
i++
|
||||||
|
} else if primary[i].Name > secondary[j].Name {
|
||||||
|
result = append(result, secondary[j])
|
||||||
|
j++
|
||||||
|
} else {
|
||||||
|
result = append(result, primary[i])
|
||||||
|
i++
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for ; i < len(primary); i++ {
|
||||||
|
result = append(result, primary[i])
|
||||||
|
}
|
||||||
|
for ; j < len(secondary); j++ {
|
||||||
|
result = append(result, secondary[j])
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
func respond(w http.ResponseWriter, data interface{}) {
|
func respond(w http.ResponseWriter, data interface{}) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package v1
|
package v1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -26,14 +27,18 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/common/route"
|
"github.com/prometheus/common/route"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/retrieval"
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
type targetRetrieverFunc func() []*retrieval.Target
|
type targetRetrieverFunc func() []*retrieval.Target
|
||||||
|
@ -476,6 +481,102 @@ func TestEndpoints(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadEndpoint(t *testing.T) {
|
||||||
|
suite, err := promql.NewTest(t, `
|
||||||
|
load 1m
|
||||||
|
test_metric1{foo="bar",baz="qux"} 1
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer suite.Close()
|
||||||
|
|
||||||
|
if err := suite.Run(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
api := &API{
|
||||||
|
Queryable: suite.Storage(),
|
||||||
|
QueryEngine: suite.QueryEngine(),
|
||||||
|
config: func() config.Config {
|
||||||
|
return config.Config{
|
||||||
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
ExternalLabels: model.LabelSet{
|
||||||
|
"baz": "a",
|
||||||
|
"b": "c",
|
||||||
|
"d": "e",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode the request.
|
||||||
|
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
|
||||||
|
data, err := proto.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
compressed := snappy.Encode(nil, data)
|
||||||
|
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
api.remoteRead(recorder, request)
|
||||||
|
|
||||||
|
// Decode the response.
|
||||||
|
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
uncompressed, err := snappy.Decode(nil, compressed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp prompb.ReadResponse
|
||||||
|
err = proto.Unmarshal(uncompressed, &resp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Results) != 1 {
|
||||||
|
t.Fatalf("Expected 1 result, got %d", len(resp.Results))
|
||||||
|
}
|
||||||
|
|
||||||
|
result := resp.Results[0]
|
||||||
|
expected := &prompb.QueryResult{
|
||||||
|
Timeseries: []*prompb.TimeSeries{
|
||||||
|
{
|
||||||
|
Labels: []*prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar"},
|
||||||
|
},
|
||||||
|
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(result, expected) {
|
||||||
|
t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRespondSuccess(t *testing.T) {
|
func TestRespondSuccess(t *testing.T) {
|
||||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
respond(w, "test")
|
respond(w, "test")
|
||||||
|
|
Loading…
Reference in a new issue