diff --git a/storage/remote/client.go b/storage/remote/client.go index b2c054ac48..e5b5bf3245 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -29,6 +29,7 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/util/httputil" ) @@ -73,29 +74,7 @@ type recoverableError struct { // Store sends a batch of samples to the HTTP endpoint. func (c *Client) Store(samples model.Samples) error { - req := &prompb.WriteRequest{ - Timeseries: make([]*prompb.TimeSeries, 0, len(samples)), - } - for _, s := range samples { - ts := &prompb.TimeSeries{ - Labels: make([]*prompb.Label, 0, len(s.Metric)), - } - for k, v := range s.Metric { - ts.Labels = append(ts.Labels, - &prompb.Label{ - Name: string(k), - Value: string(v), - }) - } - ts.Samples = []*prompb.Sample{ - { - Value: float64(s.Value), - Timestamp: int64(s.Timestamp), - }, - } - req.Timeseries = append(req.Timeseries, ts) - } - + req := ToWriteRequest(samples) data, err := proto.Marshal(req) if err != nil { return err @@ -143,17 +122,17 @@ func (c Client) Name() string { } // Read reads from a remote endpoint. -func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prompb.LabelMatcher) ([]*prompb.TimeSeries, error) { - req := &prompb.ReadRequest{ - // TODO: Support batching multiple queries into one read request, - // as the protobuf interface allows for it. - Queries: []*prompb.Query{{ - StartTimestampMs: from, - EndTimestampMs: through, - Matchers: matchers, - }}, +func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labels.Matcher) ([]*prompb.TimeSeries, error) { + query, err := ToQuery(from, through, matchers) + if err != nil { + return nil, err } + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{ + query, + }, + } data, err := proto.Marshal(req) if err != nil { return nil, fmt.Errorf("unable to marshal read request: %v", err) diff --git a/storage/remote/codec.go b/storage/remote/codec.go new file mode 100644 index 0000000000..2d5b7bb553 --- /dev/null +++ b/storage/remote/codec.go @@ -0,0 +1,218 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" +) + +// DecodeReadRequest reads a remote.Request from a http.Request. +func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} + +// EncodeReadResponse writes a remote.Response to a http.ResponseWriter. +func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed := snappy.Encode(nil, data) + _, err = w.Write(compressed) + return err +} + +// ToWriteRequest converts an array of samples into a WriteRequest proto. +func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest { + req := &prompb.WriteRequest{ + Timeseries: make([]*prompb.TimeSeries, 0, len(samples)), + } + + for _, s := range samples { + ts := prompb.TimeSeries{ + Labels: ToLabelPairs(s.Metric), + Samples: []*prompb.Sample{ + { + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }, + }, + } + req.Timeseries = append(req.Timeseries, &ts) + } + + return req +} + +// ToQuery builds a Query proto. +func ToQuery(from, to int64, matchers []*labels.Matcher) (*prompb.Query, error) { + ms, err := toLabelMatchers(matchers) + if err != nil { + return nil, err + } + + return &prompb.Query{ + StartTimestampMs: from, + EndTimestampMs: to, + Matchers: ms, + }, nil +} + +// FromQuery unpacks a Query proto. +func FromQuery(req *prompb.Query) (model.Time, model.Time, []*labels.Matcher, error) { + matchers, err := fromLabelMatchers(req.Matchers) + if err != nil { + return 0, 0, nil, err + } + from := model.Time(req.StartTimestampMs) + to := model.Time(req.EndTimestampMs) + return from, to, matchers, nil +} + +// ToQueryResult builds a QueryResult proto. +func ToQueryResult(matrix model.Matrix) *prompb.QueryResult { + resp := &prompb.QueryResult{} + for _, ss := range matrix { + ts := prompb.TimeSeries{ + Labels: ToLabelPairs(ss.Metric), + Samples: make([]*prompb.Sample, 0, len(ss.Values)), + } + for _, s := range ss.Values { + ts.Samples = append(ts.Samples, &prompb.Sample{ + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }) + } + resp.Timeseries = append(resp.Timeseries, &ts) + } + return resp +} + +// FromQueryResult unpacks a QueryResult proto. +func FromQueryResult(resp *prompb.QueryResult) model.Matrix { + m := make(model.Matrix, 0, len(resp.Timeseries)) + for _, ts := range resp.Timeseries { + var ss model.SampleStream + ss.Metric = FromLabelPairs(ts.Labels) + ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) + for _, s := range ts.Samples { + ss.Values = append(ss.Values, model.SamplePair{ + Value: model.SampleValue(s.Value), + Timestamp: model.Time(s.Timestamp), + }) + } + m = append(m, &ss) + } + + return m +} + +func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) { + pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) + for _, m := range matchers { + var mType prompb.LabelMatcher_Type + switch m.Type { + case labels.MatchEqual: + mType = prompb.LabelMatcher_EQ + case labels.MatchNotEqual: + mType = prompb.LabelMatcher_NEQ + case labels.MatchRegexp: + mType = prompb.LabelMatcher_RE + case labels.MatchNotRegexp: + mType = prompb.LabelMatcher_NRE + default: + return nil, fmt.Errorf("invalid matcher type") + } + pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ + Type: mType, + Name: m.Name, + Value: m.Value, + }) + } + return pbMatchers, nil +} + +func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) { + result := make([]*labels.Matcher, 0, len(matchers)) + for _, matcher := range matchers { + var mtype labels.MatchType + switch matcher.Type { + case prompb.LabelMatcher_EQ: + mtype = labels.MatchEqual + case prompb.LabelMatcher_NEQ: + mtype = labels.MatchNotEqual + case prompb.LabelMatcher_RE: + mtype = labels.MatchRegexp + case prompb.LabelMatcher_NRE: + mtype = labels.MatchNotRegexp + default: + return nil, fmt.Errorf("invalid matcher type") + } + matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value) + if err != nil { + return nil, err + } + result = append(result, matcher) + } + return result, nil +} + +// ToLabelPairs builds a []LabelPair from a model.Metric +func ToLabelPairs(metric model.Metric) []*prompb.Label { + labelPairs := make([]*prompb.Label, 0, len(metric)) + for k, v := range metric { + labelPairs = append(labelPairs, &prompb.Label{ + Name: string(k), + Value: string(v), + }) + } + return labelPairs +} + +// FromLabelPairs unpack a []LabelPair to a model.Metric +func FromLabelPairs(labelPairs []*prompb.Label) model.Metric { + metric := make(model.Metric, len(labelPairs)) + for _, l := range labelPairs { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} diff --git a/storage/remote/read.go b/storage/remote/read.go index fc2c873aab..37587e5bfa 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -18,7 +18,6 @@ import ( "sort" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -69,8 +68,7 @@ type querier struct { // Select returns a set of series that matches the given label matchers. func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { m, added := q.addExternalLabels(matchers) - - res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m)) + res, err := q.client.Read(context.TODO(), q.mint, q.maxt, m) if err != nil { return errSeriesSet{err: err} } @@ -90,31 +88,6 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { } } -func labelMatchersToProto(matchers []*labels.Matcher) []*prompb.LabelMatcher { - pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) - for _, m := range matchers { - var mType prompb.LabelMatcher_Type - switch m.Type { - case labels.MatchEqual: - mType = prompb.LabelMatcher_EQ - case labels.MatchNotEqual: - mType = prompb.LabelMatcher_NEQ - case labels.MatchRegexp: - mType = prompb.LabelMatcher_RE - case labels.MatchNotRegexp: - mType = prompb.LabelMatcher_NRE - default: - panic("invalid matcher type") - } - pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ - Type: mType, - Name: string(m.Name), - Value: string(m.Value), - }) - } - return pbMatchers -} - func labelPairsToLabels(labelPairs []*prompb.Label) labels.Labels { result := make(labels.Labels, 0, len(labelPairs)) for _, l := range labelPairs {