From febed48703b6f82b54b4e1927c53ab6c46257c2f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 3 Aug 2017 21:03:19 +0100 Subject: [PATCH] Implement remote read server in Prometheus. --- storage/remote/client.go | 88 ++------------- storage/remote/codec.go | 217 +++++++++++++++++++++++++++++++++++++ storage/remote/iterator.go | 13 +++ web/api/v1/api.go | 43 ++++++++ 4 files changed, 281 insertions(+), 80 deletions(-) create mode 100644 storage/remote/codec.go diff --git a/storage/remote/client.go b/storage/remote/client.go index a307c46f3..01bcd9c8f 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -76,29 +76,7 @@ type recoverableError struct { // Store sends a batch of samples to the HTTP endpoint. func (c *Client) Store(samples model.Samples) error { - req := &WriteRequest{ - Timeseries: make([]*TimeSeries, 0, len(samples)), - } - for _, s := range samples { - ts := &TimeSeries{ - Labels: make([]*LabelPair, 0, len(s.Metric)), - } - for k, v := range s.Metric { - ts.Labels = append(ts.Labels, - &LabelPair{ - Name: string(k), - Value: string(v), - }) - } - ts.Samples = []*Sample{ - { - Value: float64(s.Value), - TimestampMs: int64(s.Timestamp), - }, - } - req.Timeseries = append(req.Timeseries, ts) - } - + req := ToWriteRequest(samples) data, err := proto.Marshal(req) if err != nil { return err @@ -147,14 +125,15 @@ func (c Client) Name() string { // Read reads from a remote endpoint. func (c *Client) Read(ctx context.Context, from, through model.Time, matchers metric.LabelMatchers) (model.Matrix, error) { + query, err := ToQuery(from, through, matchers) + if err != nil { + return nil, err + } + req := &ReadRequest{ // TODO: Support batching multiple queries into one read request, // as the protobuf interface allows for it. - Queries: []*Query{{ - StartTimestampMs: int64(from), - EndTimestampMs: int64(through), - Matchers: labelMatchersToProto(matchers), - }}, + Queries: []*Query{query}, } data, err := proto.Marshal(req) @@ -203,56 +182,5 @@ func (c *Client) Read(ctx context.Context, from, through model.Time, matchers me return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) } - return matrixFromProto(resp.Results[0].Timeseries), nil -} - -func labelMatchersToProto(matchers metric.LabelMatchers) []*LabelMatcher { - pbMatchers := make([]*LabelMatcher, 0, len(matchers)) - for _, m := range matchers { - var mType MatchType - switch m.Type { - case metric.Equal: - mType = MatchType_EQUAL - case metric.NotEqual: - mType = MatchType_NOT_EQUAL - case metric.RegexMatch: - mType = MatchType_REGEX_MATCH - case metric.RegexNoMatch: - mType = MatchType_REGEX_NO_MATCH - default: - panic("invalid matcher type") - } - pbMatchers = append(pbMatchers, &LabelMatcher{ - Type: mType, - Name: string(m.Name), - Value: string(m.Value), - }) - } - return pbMatchers -} - -func matrixFromProto(seriesSet []*TimeSeries) model.Matrix { - m := make(model.Matrix, 0, len(seriesSet)) - for _, ts := range seriesSet { - var ss model.SampleStream - ss.Metric = labelPairsToMetric(ts.Labels) - ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) - for _, s := range ts.Samples { - ss.Values = append(ss.Values, model.SamplePair{ - Value: model.SampleValue(s.Value), - Timestamp: model.Time(s.TimestampMs), - }) - } - m = append(m, &ss) - } - - return m -} - -func labelPairsToMetric(labelPairs []*LabelPair) model.Metric { - metric := make(model.Metric, len(labelPairs)) - for _, l := range labelPairs { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric + return FromQueryResult(resp.Results[0]), nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go new file mode 100644 index 000000000..e13251c56 --- /dev/null +++ b/storage/remote/codec.go @@ -0,0 +1,217 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/storage/metric" +) + +// DecodeReadRequest reads a remote.Request from a http.Request. +func DecodeReadRequest(r *http.Request) (*ReadRequest, error) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} + +// EncodReadResponse writes a remote.Response to a http.ResponseWriter. +func EncodReadResponse(resp *ReadResponse, w http.ResponseWriter) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed := snappy.Encode(nil, data) + _, err = w.Write(compressed) + return err +} + +// ToWriteRequest converts an array of samples into a WriteRequest proto. +func ToWriteRequest(samples []*model.Sample) *WriteRequest { + req := &WriteRequest{ + Timeseries: make([]*TimeSeries, 0, len(samples)), + } + + for _, s := range samples { + ts := TimeSeries{ + Labels: ToLabelPairs(s.Metric), + Samples: []*Sample{ + { + Value: float64(s.Value), + TimestampMs: int64(s.Timestamp), + }, + }, + } + req.Timeseries = append(req.Timeseries, &ts) + } + + return req +} + +// ToQuery builds a Query proto. +func ToQuery(from, to model.Time, matchers []*metric.LabelMatcher) (*Query, error) { + ms, err := toLabelMatchers(matchers) + if err != nil { + return nil, err + } + + return &Query{ + StartTimestampMs: int64(from), + EndTimestampMs: int64(to), + Matchers: ms, + }, nil +} + +// FromQuery unpacks a Query proto. +func FromQuery(req *Query) (model.Time, model.Time, []*metric.LabelMatcher, error) { + matchers, err := fromLabelMatchers(req.Matchers) + if err != nil { + return 0, 0, nil, err + } + from := model.Time(req.StartTimestampMs) + to := model.Time(req.EndTimestampMs) + return from, to, matchers, nil +} + +// ToQueryResult builds a QueryResult proto. +func ToQueryResult(matrix model.Matrix) *QueryResult { + resp := &QueryResult{} + for _, ss := range matrix { + ts := TimeSeries{ + Labels: ToLabelPairs(ss.Metric), + Samples: make([]*Sample, 0, len(ss.Values)), + } + for _, s := range ss.Values { + ts.Samples = append(ts.Samples, &Sample{ + Value: float64(s.Value), + TimestampMs: int64(s.Timestamp), + }) + } + resp.Timeseries = append(resp.Timeseries, &ts) + } + return resp +} + +// FromQueryResult unpacks a QueryResult proto. +func FromQueryResult(resp *QueryResult) model.Matrix { + m := make(model.Matrix, 0, len(resp.Timeseries)) + for _, ts := range resp.Timeseries { + var ss model.SampleStream + ss.Metric = FromLabelPairs(ts.Labels) + ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) + for _, s := range ts.Samples { + ss.Values = append(ss.Values, model.SamplePair{ + Value: model.SampleValue(s.Value), + Timestamp: model.Time(s.TimestampMs), + }) + } + m = append(m, &ss) + } + + return m +} + +func toLabelMatchers(matchers []*metric.LabelMatcher) ([]*LabelMatcher, error) { + result := make([]*LabelMatcher, 0, len(matchers)) + for _, matcher := range matchers { + var mType MatchType + switch matcher.Type { + case metric.Equal: + mType = MatchType_EQUAL + case metric.NotEqual: + mType = MatchType_NOT_EQUAL + case metric.RegexMatch: + mType = MatchType_REGEX_MATCH + case metric.RegexNoMatch: + mType = MatchType_REGEX_NO_MATCH + default: + return nil, fmt.Errorf("invalid matcher type") + } + result = append(result, &LabelMatcher{ + Type: mType, + Name: string(matcher.Name), + Value: string(matcher.Value), + }) + } + return result, nil +} + +func fromLabelMatchers(matchers []*LabelMatcher) ([]*metric.LabelMatcher, error) { + result := make(metric.LabelMatchers, 0, len(matchers)) + for _, matcher := range matchers { + var mtype metric.MatchType + switch matcher.Type { + case MatchType_EQUAL: + mtype = metric.Equal + case MatchType_NOT_EQUAL: + mtype = metric.NotEqual + case MatchType_REGEX_MATCH: + mtype = metric.RegexMatch + case MatchType_REGEX_NO_MATCH: + mtype = metric.RegexNoMatch + default: + return nil, fmt.Errorf("invalid matcher type") + } + matcher, err := metric.NewLabelMatcher(mtype, model.LabelName(matcher.Name), model.LabelValue(matcher.Value)) + if err != nil { + return nil, err + } + result = append(result, matcher) + } + return result, nil +} + +// ToLabelPairs builds a []LabelPair from a model.Metric +func ToLabelPairs(metric model.Metric) []*LabelPair { + labelPairs := make([]*LabelPair, 0, len(metric)) + for k, v := range metric { + labelPairs = append(labelPairs, &LabelPair{ + Name: string(k), + Value: string(v), + }) + } + return labelPairs +} + +// FromLabelPairs unpack a []LabelPair to a model.Metric +func FromLabelPairs(labelPairs []*LabelPair) model.Metric { + metric := make(model.Metric, len(labelPairs)) + for _, l := range labelPairs { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} diff --git a/storage/remote/iterator.go b/storage/remote/iterator.go index 991cb167b..f9badd484 100644 --- a/storage/remote/iterator.go +++ b/storage/remote/iterator.go @@ -17,6 +17,7 @@ import ( "sort" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/metric" ) @@ -62,3 +63,15 @@ func (it sampleStreamIterator) RangeValues(in metric.Interval) []model.SamplePai } func (it sampleStreamIterator) Close() {} + +// IteratorsToMatrix converts a list of iterators into a model.Matrix. +func IteratorsToMatrix(iters []local.SeriesIterator, interval metric.Interval) model.Matrix { + result := make(model.Matrix, 0, len(iters)) + for _, iter := range iters { + result = append(result, &model.SampleStream{ + Metric: iter.Metric().Metric, + Values: iter.RangeValues(interval), + }) + } + return result +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index af80b9f66..e0a73fc77 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/metric" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/util/httputil" ) @@ -152,6 +153,7 @@ func (api *API) Register(r *route.Router) { r.Get("/alertmanagers", instr("alertmanagers", api.alertmanagers)) r.Get("/status/config", instr("config", api.serveConfig)) + r.Post("/read", prometheus.InstrumentHandler("read", http.HandlerFunc(api.remoteRead))) } type queryData struct { @@ -452,6 +454,47 @@ func (api *API) serveConfig(r *http.Request) (interface{}, *apiError) { return cfg, nil } +func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { + req, err := remote.DecodeReadRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + resp := remote.ReadResponse{ + Results: make([]*remote.QueryResult, len(req.Queries)), + } + for i, query := range req.Queries { + querier, err := api.Storage.Querier() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer querier.Close() + + from, through, matchers, err := remote.FromQuery(query) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + iters, err := querier.QueryRange(r.Context(), from, through, matchers...) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + resp.Results[i] = remote.ToQueryResult(remote.IteratorsToMatrix(iters, metric.Interval{ + OldestInclusive: from, + NewestInclusive: through, + })) + } + + if err := remote.EncodReadResponse(&resp, w); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + func respond(w http.ResponseWriter, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK)