From ce97cdd477fcea8077d00c63ef9e279f64e45add Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 26 Feb 2021 16:43:19 +0000 Subject: [PATCH] Move remote read handler to remote package. (#8536) * Move remote read handler to remote package. This follows the pattern I started with the remote write handler. The api/v1 package is getting pretty cluttered. Moving code to other packages helps reduce this size and also makes it reusable - eg Cortex doesn't do streaming remote writes yet, and will very soon. Signed-off-by: Tom Wilkie * Deal with a nil remoteReadHandler for tests. Signed-off-by: Tom Wilkie * Remove the global metrics. Signed-off-by: Tom Wilkie * Fix test. Signed-off-by: Tom Wilkie * Review feedback. Signed-off-by: Tom Wilkie --- storage/remote/read_handler.go | 272 +++++++++++++++++++++++ storage/remote/read_handler_test.go | 331 ++++++++++++++++++++++++++++ storage/remote/write_handler.go | 8 +- web/api/v1/api.go | 281 +++-------------------- web/api/v1/api_test.go | 313 -------------------------- web/web.go | 1 + 6 files changed, 641 insertions(+), 565 deletions(-) create mode 100644 storage/remote/read_handler.go create mode 100644 storage/remote/read_handler_test.go diff --git a/storage/remote/read_handler.go b/storage/remote/read_handler.go new file mode 100644 index 0000000000..e5c8ea9ee3 --- /dev/null +++ b/storage/remote/read_handler.go @@ -0,0 +1,272 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "net/http" + "sort" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/gate" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +type readHandler struct { + logger log.Logger + queryable storage.SampleAndChunkQueryable + config func() config.Config + remoteReadSampleLimit int + remoteReadMaxBytesInFrame int + remoteReadGate *gate.Gate + queries prometheus.Gauge +} + +// NewReadHandler creates a http.Handler that accepts remote read requests and +// writes them to the provided queryable. +func NewReadHandler(logger log.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler { + h := &readHandler{ + logger: logger, + queryable: queryable, + config: config, + remoteReadSampleLimit: remoteReadSampleLimit, + remoteReadGate: gate.New(remoteReadConcurrencyLimit), + remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame, + + queries: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "prometheus", + Subsystem: "api", // TODO: changes to storage in Prometheus 3.0. + Name: "remote_read_queries", + Help: "The current number of remote read queries being executed or waiting.", + }), + } + if r != nil { + r.MustRegister(h.queries) + } + return h +} + +func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + if err := h.remoteReadGate.Start(ctx); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + h.queries.Inc() + + defer h.remoteReadGate.Done() + defer h.queries.Dec() + + req, err := DecodeReadRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + externalLabels := h.config().GlobalConfig.ExternalLabels.Map() + + sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) + for name, value := range externalLabels { + sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ + Name: name, + Value: value, + }) + } + sort.Slice(sortedExternalLabels, func(i, j int) bool { + return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name + }) + + responseType, err := NegotiateResponseType(req.AcceptedResponseTypes) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + switch responseType { + case prompb.ReadRequest_STREAMED_XOR_CHUNKS: + h.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels) + default: + // On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response. + h.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels) + } +} + +func (h *readHandler) remoteReadSamples( + ctx context.Context, + w http.ResponseWriter, + req *prompb.ReadRequest, + externalLabels map[string]string, + sortedExternalLabels []prompb.Label, +) { + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + resp := prompb.ReadResponse{ + Results: make([]*prompb.QueryResult, len(req.Queries)), + } + for i, query := range req.Queries { + if err := func() error { + filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) + if err != nil { + return err + } + + querier, err := h.queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + return err + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(h.logger).Log("msg", "Error on querier close", "err", err.Error()) + } + }() + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + + var ws storage.Warnings + resp.Results[i], ws, err = ToQueryResult(querier.Select(false, hints, filteredMatchers...), h.remoteReadSampleLimit) + if err != nil { + return err + } + for _, w := range ws { + level.Warn(h.logger).Log("msg", "Warnings on remote read query", "err", w.Error()) + } + for _, ts := range resp.Results[i].Timeseries { + ts.Labels = MergeLabels(ts.Labels, sortedExternalLabels) + } + return nil + }(); err != nil { + if httpErr, ok := err.(HTTPError); ok { + http.Error(w, httpErr.Error(), httpErr.Status()) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + if err := EncodeReadResponse(&resp, w); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) { + w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError) + return + } + + for i, query := range req.Queries { + if err := func() error { + filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) + if err != nil { + return err + } + + querier, err := h.queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + return err + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error()) + } + }() + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + + ws, err := StreamChunkedReadResponses( + NewChunkedWriter(w, f), + int64(i), + // The streaming API has to provide the series sorted. + querier.Select(true, hints, filteredMatchers...), + sortedExternalLabels, + h.remoteReadMaxBytesInFrame, + ) + if err != nil { + return err + } + + for _, w := range ws { + level.Warn(h.logger).Log("msg", "Warnings on chunked remote read query", "warnings", w.Error()) + } + return nil + }(); err != nil { + if httpErr, ok := err.(HTTPError); ok { + http.Error(w, httpErr.Error(), httpErr.Status()) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +// filterExtLabelsFromMatchers change equality matchers which match external labels +// to a matcher that looks for an empty label, +// as that label should not be present in the storage. +func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) { + matchers, err := FromLabelMatchers(pbMatchers) + if err != nil { + return nil, err + } + + filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) + for _, m := range matchers { + value := externalLabels[m.Name] + if m.Type == labels.MatchEqual && value == m.Value { + matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") + if err != nil { + return nil, err + } + filteredMatchers = append(filteredMatchers, matcher) + } else { + filteredMatchers = append(filteredMatchers, m) + } + } + + return filteredMatchers, nil +} diff --git a/storage/remote/read_handler_test.go b/storage/remote/read_handler_test.go new file mode 100644 index 0000000000..2bc8a34290 --- /dev/null +++ b/storage/remote/read_handler_test.go @@ -0,0 +1,331 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" +) + +func TestSampledReadEndpoint(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar",baz="qux"} 1 + `) + require.NoError(t, err) + + defer suite.Close() + + err = suite.Run() + require.NoError(t, err) + + h := NewReadHandler(nil, nil, suite.Storage(), func() config.Config { + return config.Config{ + GlobalConfig: config.GlobalConfig{ + ExternalLabels: labels.Labels{ + // We expect external labels to be added, with the source labels honored. + {Name: "baz", Value: "a"}, + {Name: "b", Value: "c"}, + {Name: "d", Value: "e"}, + }, + }, + } + }, 1e6, 1, 0) + + // Encode the request. + matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") + require.NoError(t, err) + + matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") + require.NoError(t, err) + + query, err := ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) + require.NoError(t, err) + + req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} + data, err := proto.Marshal(req) + require.NoError(t, err) + + compressed := snappy.Encode(nil, data) + request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + h.ServeHTTP(recorder, request) + + if recorder.Code/100 != 2 { + t.Fatal(recorder.Code) + } + + require.Equal(t, "application/x-protobuf", recorder.Result().Header.Get("Content-Type")) + require.Equal(t, "snappy", recorder.Result().Header.Get("Content-Encoding")) + + // Decode the response. + compressed, err = ioutil.ReadAll(recorder.Result().Body) + require.NoError(t, err) + + uncompressed, err := snappy.Decode(nil, compressed) + require.NoError(t, err) + + var resp prompb.ReadResponse + err = proto.Unmarshal(uncompressed, &resp) + require.NoError(t, err) + + if len(resp.Results) != 1 { + t.Fatalf("Expected 1 result, got %d", len(resp.Results)) + } + + require.Equal(t, &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }, + }, + }, resp.Results[0]) +} + +func TestStreamReadEndpoint(t *testing.T) { + // First with 120 samples. We expect 1 frame with 1 chunk. + // Second with 121 samples, We expect 1 frame with 2 chunks. + // Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit. + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar1",baz="qux"} 0+100x119 + test_metric1{foo="bar2",baz="qux"} 0+100x120 + test_metric1{foo="bar3",baz="qux"} 0+100x240 + `) + require.NoError(t, err) + + defer suite.Close() + + require.NoError(t, suite.Run()) + + api := NewReadHandler(nil, nil, suite.Storage(), func() config.Config { + return config.Config{ + GlobalConfig: config.GlobalConfig{ + ExternalLabels: labels.Labels{ + // We expect external labels to be added, with the source labels honored. + {Name: "baz", Value: "a"}, + {Name: "b", Value: "c"}, + {Name: "d", Value: "e"}, + }, + }, + } + }, + 1e6, 1, + // Labelset has 57 bytes. Full chunk in test data has roughly 240 bytes. This allows us to have at max 2 chunks in this test. + 57+480, + ) + + // Encode the request. + matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") + require.NoError(t, err) + + matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") + require.NoError(t, err) + + matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") + require.NoError(t, err) + + query1, err := ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{ + Step: 1, + Func: "avg", + Start: 0, + End: 14400001, + }) + require.NoError(t, err) + + query2, err := ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{ + Step: 1, + Func: "avg", + Start: 0, + End: 14400001, + }) + require.NoError(t, err) + + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{query1, query2}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + } + data, err := proto.Marshal(req) + require.NoError(t, err) + + compressed := snappy.Encode(nil, data) + request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + api.ServeHTTP(recorder, request) + + if recorder.Code/100 != 2 { + t.Fatal(recorder.Code) + } + + require.Equal(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type")) + require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding")) + + var results []*prompb.ChunkedReadResponse + stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil) + for { + res := &prompb.ChunkedReadResponse{} + err := stream.NextProto(res) + if err == io.EOF { + break + } + require.NoError(t, err) + results = append(results, res) + } + + if len(results) != 5 { + t.Fatalf("Expected 5 result, got %d", len(results)) + } + + require.Equal(t, []*prompb.ChunkedReadResponse{ + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar1"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + MaxTimeMs: 7140000, + Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar2"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + MaxTimeMs: 7140000, + Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), + }, + { + Type: prompb.Chunk_XOR, + MinTimeMs: 7200000, + MaxTimeMs: 7200000, + Data: []byte("\000\001\200\364\356\006@\307p\000\000\000\000\000\000"), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar3"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + MaxTimeMs: 7140000, + Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), + }, + { + Type: prompb.Chunk_XOR, + MinTimeMs: 7200000, + MaxTimeMs: 14340000, + Data: []byte("\000x\200\364\356\006@\307p\000\000\000\000\000\340\324\003\340>\224\355\260\277\322\200\372\005(=\240R\207:\003(\025\240\362\201z\003(\365\240r\203:\005(\r\241\322\201\372\r(\r\240R\237:\007(5\2402\201z\037(\025\2402\203:\005(\375\240R\200\372\r(\035\241\322\201:\003(5\240r\326g\364\271\213\227!\253q\037\312N\340GJ\033E)\375\024\241\266\362}(N\217(V\203)\336\207(\326\203(N\334W\322\203\2644\240}\005(\373AJ\031\3202\202\264\374\240\275\003(kA\3129\320R\201\2644\240\375\264\277\322\200\332\005(3\240r\207Z\003(\027\240\362\201Z\003(\363\240R\203\332\005(\017\241\322\201\332\r(\023\2402\237Z\007(7\2402\201Z\037(\023\240\322\200\332\005(\377\240R\200\332\r "), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar3"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + MinTimeMs: 14400000, + MaxTimeMs: 14400000, + Data: []byte("\000\001\200\350\335\r@\327p\000\000\000\000\000\000"), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar1"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + MaxTimeMs: 7140000, + Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), + }, + }, + }, + }, + QueryIndex: 1, + }, + }, results) +} diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 435c996076..20e2cf4512 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -23,7 +23,7 @@ import ( "github.com/prometheus/prometheus/storage" ) -type handler struct { +type writeHandler struct { logger log.Logger appendable storage.Appendable } @@ -31,13 +31,13 @@ type handler struct { // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { - return &handler{ + return &writeHandler{ logger: logger, appendable: appendable, } } -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { req, err := DecodeWriteRequest(r.Body) if err != nil { level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) @@ -62,7 +62,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { +func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { app := h.appendable.Appender(ctx) defer func() { if err != nil { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 81980ae47a..b6a6fcdb5b 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -39,11 +39,9 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" - "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" @@ -56,11 +54,6 @@ import ( "github.com/prometheus/prometheus/util/stats" ) -const ( - namespace = "prometheus" - subsystem = "api" -) - type status string const ( @@ -83,12 +76,6 @@ const ( var ( LocalhostRepresentations = []string{"127.0.0.1", "localhost", "::1"} - remoteReadQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "remote_read_queries", - Help: "The current number of remote read queries being executed or waiting.", - }) ) type apiError struct { @@ -183,23 +170,21 @@ type API struct { ready func(http.HandlerFunc) http.HandlerFunc globalURLOptions GlobalURLOptions - db TSDBAdminStats - dbDir string - enableAdmin bool - logger log.Logger - remoteReadSampleLimit int - remoteReadMaxBytesInFrame int - remoteReadGate *gate.Gate - CORSOrigin *regexp.Regexp - buildInfo *PrometheusVersion - runtimeInfo func() (RuntimeInfo, error) - gatherer prometheus.Gatherer - remoteWriteHandler http.Handler + db TSDBAdminStats + dbDir string + enableAdmin bool + logger log.Logger + CORSOrigin *regexp.Regexp + buildInfo *PrometheusVersion + runtimeInfo func() (RuntimeInfo, error) + gatherer prometheus.Gatherer + + remoteWriteHandler http.Handler + remoteReadHandler http.Handler } func init() { jsoniter.RegisterTypeEncoderFunc("promql.Point", marshalPointJSON, marshalPointJSONIsEmpty) - prometheus.MustRegister(remoteReadQueries) } // NewAPI returns an initialized API type. @@ -225,6 +210,7 @@ func NewAPI( runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, gatherer prometheus.Gatherer, + registerer prometheus.Registerer, ) *API { a := &API{ QueryEngine: qe, @@ -233,23 +219,22 @@ func NewAPI( targetRetriever: tr, alertmanagerRetriever: ar, - now: time.Now, - config: configFunc, - flagsMap: flagsMap, - ready: readyFunc, - globalURLOptions: globalURLOptions, - db: db, - dbDir: dbDir, - enableAdmin: enableAdmin, - rulesRetriever: rr, - remoteReadSampleLimit: remoteReadSampleLimit, - remoteReadGate: gate.New(remoteReadConcurrencyLimit), - remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame, - logger: logger, - CORSOrigin: CORSOrigin, - runtimeInfo: runtimeInfo, - buildInfo: buildInfo, - gatherer: gatherer, + now: time.Now, + config: configFunc, + flagsMap: flagsMap, + ready: readyFunc, + globalURLOptions: globalURLOptions, + db: db, + dbDir: dbDir, + enableAdmin: enableAdmin, + rulesRetriever: rr, + logger: logger, + CORSOrigin: CORSOrigin, + runtimeInfo: runtimeInfo, + buildInfo: buildInfo, + gatherer: gatherer, + + remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame), } if ap != nil { @@ -331,7 +316,6 @@ func (api *API) Register(r *route.Router) { r.Put("/admin/tsdb/delete_series", wrap(api.deleteSeries)) r.Put("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones)) r.Put("/admin/tsdb/snapshot", wrap(api.snapshot)) - } type queryData struct { @@ -1319,211 +1303,12 @@ func (api *API) serveTSDBStatus(*http.Request) apiFuncResult { } func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - if err := api.remoteReadGate.Start(ctx); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + // This is only really for tests - this will never be nil IRL. + if api.remoteReadHandler != nil { + api.remoteReadHandler.ServeHTTP(w, r) + } else { + http.Error(w, "not found", http.StatusNotFound) } - remoteReadQueries.Inc() - - defer api.remoteReadGate.Done() - defer remoteReadQueries.Dec() - - req, err := remote.DecodeReadRequest(r) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - externalLabels := api.config().GlobalConfig.ExternalLabels.Map() - - sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) - for name, value := range externalLabels { - sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ - Name: name, - Value: value, - }) - } - sort.Slice(sortedExternalLabels, func(i, j int) bool { - return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name - }) - - responseType, err := remote.NegotiateResponseType(req.AcceptedResponseTypes) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - switch responseType { - case prompb.ReadRequest_STREAMED_XOR_CHUNKS: - api.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels) - default: - // On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response. - api.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels) - } -} - -func (api *API) remoteReadSamples( - ctx context.Context, - w http.ResponseWriter, - req *prompb.ReadRequest, - externalLabels map[string]string, - sortedExternalLabels []prompb.Label, -) { - w.Header().Set("Content-Type", "application/x-protobuf") - w.Header().Set("Content-Encoding", "snappy") - - resp := prompb.ReadResponse{ - Results: make([]*prompb.QueryResult, len(req.Queries)), - } - for i, query := range req.Queries { - if err := func() error { - filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) - if err != nil { - return err - } - - querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) - if err != nil { - return err - } - defer func() { - if err := querier.Close(); err != nil { - level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error()) - } - }() - - var hints *storage.SelectHints - if query.Hints != nil { - hints = &storage.SelectHints{ - Start: query.Hints.StartMs, - End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, - Grouping: query.Hints.Grouping, - Range: query.Hints.RangeMs, - By: query.Hints.By, - } - } - - var ws storage.Warnings - resp.Results[i], ws, err = remote.ToQueryResult(querier.Select(false, hints, filteredMatchers...), api.remoteReadSampleLimit) - if err != nil { - return err - } - for _, w := range ws { - level.Warn(api.logger).Log("msg", "Warnings on remote read query", "err", w.Error()) - } - for _, ts := range resp.Results[i].Timeseries { - ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels) - } - return nil - }(); err != nil { - if httpErr, ok := err.(remote.HTTPError); ok { - http.Error(w, httpErr.Error(), httpErr.Status()) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - if err := remote.EncodeReadResponse(&resp, w); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) { - w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") - - f, ok := w.(http.Flusher) - if !ok { - http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError) - return - } - - for i, query := range req.Queries { - if err := func() error { - filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) - if err != nil { - return err - } - - querier, err := api.Queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs) - if err != nil { - return err - } - defer func() { - if err := querier.Close(); err != nil { - level.Warn(api.logger).Log("msg", "Error on chunk querier close", "err", err.Error()) - } - }() - - var hints *storage.SelectHints - if query.Hints != nil { - hints = &storage.SelectHints{ - Start: query.Hints.StartMs, - End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, - Grouping: query.Hints.Grouping, - Range: query.Hints.RangeMs, - By: query.Hints.By, - } - } - - ws, err := remote.StreamChunkedReadResponses( - remote.NewChunkedWriter(w, f), - int64(i), - // The streaming API has to provide the series sorted. - querier.Select(true, hints, filteredMatchers...), - sortedExternalLabels, - api.remoteReadMaxBytesInFrame, - ) - if err != nil { - return err - } - - for _, w := range ws { - level.Warn(api.logger).Log("msg", "Warnings on chunked remote read query", "warnings", w.Error()) - } - return nil - }(); err != nil { - if httpErr, ok := err.(remote.HTTPError); ok { - http.Error(w, httpErr.Error(), httpErr.Status()) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } -} - -// filterExtLabelsFromMatchers change equality matchers which match external labels -// to a matcher that looks for an empty label, -// as that label should not be present in the storage. -func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) { - matchers, err := remote.FromLabelMatchers(pbMatchers) - if err != nil { - return nil, err - } - - filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) - for _, m := range matchers { - value := externalLabels[m.Name] - if m.Type == labels.MatchEqual && value == m.Value { - matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") - if err != nil { - return nil, err - } - filteredMatchers = append(filteredMatchers, matcher) - } else { - filteredMatchers = append(filteredMatchers, m) - } - } - - return filteredMatchers, nil } func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index edcaf95c7e..42819236fa 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -14,11 +14,9 @@ package v1 import ( - "bytes" "context" "encoding/json" "fmt" - "io" "io/ioutil" "math" "net/http" @@ -33,8 +31,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" @@ -44,7 +40,6 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" @@ -1954,314 +1949,6 @@ func assertAPIResponseLength(t *testing.T, got interface{}, expLen int) { } } -func TestSampledReadEndpoint(t *testing.T) { - suite, err := promql.NewTest(t, ` - load 1m - test_metric1{foo="bar",baz="qux"} 1 - `) - require.NoError(t, err) - - defer suite.Close() - - err = suite.Run() - require.NoError(t, err) - - api := &API{ - Queryable: suite.Storage(), - QueryEngine: suite.QueryEngine(), - config: func() config.Config { - return config.Config{ - GlobalConfig: config.GlobalConfig{ - ExternalLabels: labels.Labels{ - // We expect external labels to be added, with the source labels honored. - {Name: "baz", Value: "a"}, - {Name: "b", Value: "c"}, - {Name: "d", Value: "e"}, - }, - }, - } - }, - remoteReadSampleLimit: 1e6, - remoteReadGate: gate.New(1), - } - - // Encode the request. - matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") - require.NoError(t, err) - - matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") - require.NoError(t, err) - - query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) - require.NoError(t, err) - - req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} - data, err := proto.Marshal(req) - require.NoError(t, err) - - compressed := snappy.Encode(nil, data) - request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) - require.NoError(t, err) - - recorder := httptest.NewRecorder() - api.remoteRead(recorder, request) - - if recorder.Code/100 != 2 { - t.Fatal(recorder.Code) - } - - require.Equal(t, "application/x-protobuf", recorder.Result().Header.Get("Content-Type")) - require.Equal(t, "snappy", recorder.Result().Header.Get("Content-Encoding")) - - // Decode the response. - compressed, err = ioutil.ReadAll(recorder.Result().Body) - require.NoError(t, err) - - uncompressed, err := snappy.Decode(nil, compressed) - require.NoError(t, err) - - var resp prompb.ReadResponse - err = proto.Unmarshal(uncompressed, &resp) - require.NoError(t, err) - - if len(resp.Results) != 1 { - t.Fatalf("Expected 1 result, got %d", len(resp.Results)) - } - - require.Equal(t, &prompb.QueryResult{ - Timeseries: []*prompb.TimeSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar"}, - }, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }, - }, - }, resp.Results[0]) -} - -func TestStreamReadEndpoint(t *testing.T) { - // First with 120 samples. We expect 1 frame with 1 chunk. - // Second with 121 samples, We expect 1 frame with 2 chunks. - // Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit. - suite, err := promql.NewTest(t, ` - load 1m - test_metric1{foo="bar1",baz="qux"} 0+100x119 - test_metric1{foo="bar2",baz="qux"} 0+100x120 - test_metric1{foo="bar3",baz="qux"} 0+100x240 - `) - require.NoError(t, err) - - defer suite.Close() - - require.NoError(t, suite.Run()) - - api := &API{ - Queryable: suite.Storage(), - QueryEngine: suite.QueryEngine(), - config: func() config.Config { - return config.Config{ - GlobalConfig: config.GlobalConfig{ - ExternalLabels: labels.Labels{ - // We expect external labels to be added, with the source labels honored. - {Name: "baz", Value: "a"}, - {Name: "b", Value: "c"}, - {Name: "d", Value: "e"}, - }, - }, - } - }, - remoteReadSampleLimit: 1e6, - remoteReadGate: gate.New(1), - // Labelset has 57 bytes. Full chunk in test data has roughly 240 bytes. This allows us to have at max 2 chunks in this test. - remoteReadMaxBytesInFrame: 57 + 480, - } - - // Encode the request. - matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") - require.NoError(t, err) - - matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") - require.NoError(t, err) - - matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") - require.NoError(t, err) - - query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{ - Step: 1, - Func: "avg", - Start: 0, - End: 14400001, - }) - require.NoError(t, err) - - query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{ - Step: 1, - Func: "avg", - Start: 0, - End: 14400001, - }) - require.NoError(t, err) - - req := &prompb.ReadRequest{ - Queries: []*prompb.Query{query1, query2}, - AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, - } - data, err := proto.Marshal(req) - require.NoError(t, err) - - compressed := snappy.Encode(nil, data) - request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) - require.NoError(t, err) - - recorder := httptest.NewRecorder() - api.remoteRead(recorder, request) - - if recorder.Code/100 != 2 { - t.Fatal(recorder.Code) - } - - require.Equal(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type")) - require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding")) - - var results []*prompb.ChunkedReadResponse - stream := remote.NewChunkedReader(recorder.Result().Body, remote.DefaultChunkedReadLimit, nil) - for { - res := &prompb.ChunkedReadResponse{} - err := stream.NextProto(res) - if err == io.EOF { - break - } - require.NoError(t, err) - results = append(results, res) - } - - if len(results) != 5 { - t.Fatalf("Expected 5 result, got %d", len(results)) - } - - require.Equal(t, []*prompb.ChunkedReadResponse{ - { - ChunkedSeries: []*prompb.ChunkedSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar1"}, - }, - Chunks: []prompb.Chunk{ - { - Type: prompb.Chunk_XOR, - MaxTimeMs: 7140000, - Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), - }, - }, - }, - }, - }, - { - ChunkedSeries: []*prompb.ChunkedSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar2"}, - }, - Chunks: []prompb.Chunk{ - { - Type: prompb.Chunk_XOR, - MaxTimeMs: 7140000, - Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), - }, - { - Type: prompb.Chunk_XOR, - MinTimeMs: 7200000, - MaxTimeMs: 7200000, - Data: []byte("\000\001\200\364\356\006@\307p\000\000\000\000\000\000"), - }, - }, - }, - }, - }, - { - ChunkedSeries: []*prompb.ChunkedSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar3"}, - }, - Chunks: []prompb.Chunk{ - { - Type: prompb.Chunk_XOR, - MaxTimeMs: 7140000, - Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), - }, - { - Type: prompb.Chunk_XOR, - MinTimeMs: 7200000, - MaxTimeMs: 14340000, - Data: []byte("\000x\200\364\356\006@\307p\000\000\000\000\000\340\324\003\340>\224\355\260\277\322\200\372\005(=\240R\207:\003(\025\240\362\201z\003(\365\240r\203:\005(\r\241\322\201\372\r(\r\240R\237:\007(5\2402\201z\037(\025\2402\203:\005(\375\240R\200\372\r(\035\241\322\201:\003(5\240r\326g\364\271\213\227!\253q\037\312N\340GJ\033E)\375\024\241\266\362}(N\217(V\203)\336\207(\326\203(N\334W\322\203\2644\240}\005(\373AJ\031\3202\202\264\374\240\275\003(kA\3129\320R\201\2644\240\375\264\277\322\200\332\005(3\240r\207Z\003(\027\240\362\201Z\003(\363\240R\203\332\005(\017\241\322\201\332\r(\023\2402\237Z\007(7\2402\201Z\037(\023\240\322\200\332\005(\377\240R\200\332\r "), - }, - }, - }, - }, - }, - { - ChunkedSeries: []*prompb.ChunkedSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar3"}, - }, - Chunks: []prompb.Chunk{ - { - Type: prompb.Chunk_XOR, - MinTimeMs: 14400000, - MaxTimeMs: 14400000, - Data: []byte("\000\001\200\350\335\r@\327p\000\000\000\000\000\000"), - }, - }, - }, - }, - }, - { - ChunkedSeries: []*prompb.ChunkedSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar1"}, - }, - Chunks: []prompb.Chunk{ - { - Type: prompb.Chunk_XOR, - MaxTimeMs: 7140000, - Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), - }, - }, - }, - }, - QueryIndex: 1, - }, - }, results) -} - type fakeDB struct { err error } diff --git a/web/web.go b/web/web.go index 3c86a47d7c..185aeedb75 100644 --- a/web/web.go +++ b/web/web.go @@ -328,6 +328,7 @@ func New(logger log.Logger, o *Options) *Handler { h.runtimeInfo, h.versionInfo, o.Gatherer, + o.Registerer, ) if o.RoutePrefix != "/" {