mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Move remote read handler to remote package. (#8536)
* Move remote read handler to remote package. This follows the pattern I started with the remote write handler. The api/v1 package is getting pretty cluttered. Moving code to other packages helps reduce this size and also makes it reusable - eg Cortex doesn't do streaming remote writes yet, and will very soon. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Deal with a nil remoteReadHandler for tests. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Remove the global metrics. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Fix test. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Review feedback. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
17688550d7
commit
ce97cdd477
272
storage/remote/read_handler.go
Normal file
272
storage/remote/read_handler.go
Normal file
|
@ -0,0 +1,272 @@
|
||||||
|
// Copyright 2021 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package remote
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/pkg/gate"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type readHandler struct {
|
||||||
|
logger log.Logger
|
||||||
|
queryable storage.SampleAndChunkQueryable
|
||||||
|
config func() config.Config
|
||||||
|
remoteReadSampleLimit int
|
||||||
|
remoteReadMaxBytesInFrame int
|
||||||
|
remoteReadGate *gate.Gate
|
||||||
|
queries prometheus.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReadHandler creates a http.Handler that accepts remote read requests and
|
||||||
|
// writes them to the provided queryable.
|
||||||
|
func NewReadHandler(logger log.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler {
|
||||||
|
h := &readHandler{
|
||||||
|
logger: logger,
|
||||||
|
queryable: queryable,
|
||||||
|
config: config,
|
||||||
|
remoteReadSampleLimit: remoteReadSampleLimit,
|
||||||
|
remoteReadGate: gate.New(remoteReadConcurrencyLimit),
|
||||||
|
remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
|
||||||
|
|
||||||
|
queries: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "prometheus",
|
||||||
|
Subsystem: "api", // TODO: changes to storage in Prometheus 3.0.
|
||||||
|
Name: "remote_read_queries",
|
||||||
|
Help: "The current number of remote read queries being executed or waiting.",
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
if r != nil {
|
||||||
|
r.MustRegister(h.queries)
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
if err := h.remoteReadGate.Start(ctx); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.queries.Inc()
|
||||||
|
|
||||||
|
defer h.remoteReadGate.Done()
|
||||||
|
defer h.queries.Dec()
|
||||||
|
|
||||||
|
req, err := DecodeReadRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
externalLabels := h.config().GlobalConfig.ExternalLabels.Map()
|
||||||
|
|
||||||
|
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
|
||||||
|
for name, value := range externalLabels {
|
||||||
|
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
|
||||||
|
Name: name,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Slice(sortedExternalLabels, func(i, j int) bool {
|
||||||
|
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
|
||||||
|
})
|
||||||
|
|
||||||
|
responseType, err := NegotiateResponseType(req.AcceptedResponseTypes)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch responseType {
|
||||||
|
case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
|
||||||
|
h.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels)
|
||||||
|
default:
|
||||||
|
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
|
||||||
|
h.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *readHandler) remoteReadSamples(
|
||||||
|
ctx context.Context,
|
||||||
|
w http.ResponseWriter,
|
||||||
|
req *prompb.ReadRequest,
|
||||||
|
externalLabels map[string]string,
|
||||||
|
sortedExternalLabels []prompb.Label,
|
||||||
|
) {
|
||||||
|
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||||
|
w.Header().Set("Content-Encoding", "snappy")
|
||||||
|
|
||||||
|
resp := prompb.ReadResponse{
|
||||||
|
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||||
|
}
|
||||||
|
for i, query := range req.Queries {
|
||||||
|
if err := func() error {
|
||||||
|
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
querier, err := h.queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := querier.Close(); err != nil {
|
||||||
|
level.Warn(h.logger).Log("msg", "Error on querier close", "err", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var hints *storage.SelectHints
|
||||||
|
if query.Hints != nil {
|
||||||
|
hints = &storage.SelectHints{
|
||||||
|
Start: query.Hints.StartMs,
|
||||||
|
End: query.Hints.EndMs,
|
||||||
|
Step: query.Hints.StepMs,
|
||||||
|
Func: query.Hints.Func,
|
||||||
|
Grouping: query.Hints.Grouping,
|
||||||
|
Range: query.Hints.RangeMs,
|
||||||
|
By: query.Hints.By,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var ws storage.Warnings
|
||||||
|
resp.Results[i], ws, err = ToQueryResult(querier.Select(false, hints, filteredMatchers...), h.remoteReadSampleLimit)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, w := range ws {
|
||||||
|
level.Warn(h.logger).Log("msg", "Warnings on remote read query", "err", w.Error())
|
||||||
|
}
|
||||||
|
for _, ts := range resp.Results[i].Timeseries {
|
||||||
|
ts.Labels = MergeLabels(ts.Labels, sortedExternalLabels)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
if httpErr, ok := err.(HTTPError); ok {
|
||||||
|
http.Error(w, httpErr.Error(), httpErr.Status())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := EncodeReadResponse(&resp, w); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
|
||||||
|
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
|
||||||
|
|
||||||
|
f, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, query := range req.Queries {
|
||||||
|
if err := func() error {
|
||||||
|
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
querier, err := h.queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := querier.Close(); err != nil {
|
||||||
|
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var hints *storage.SelectHints
|
||||||
|
if query.Hints != nil {
|
||||||
|
hints = &storage.SelectHints{
|
||||||
|
Start: query.Hints.StartMs,
|
||||||
|
End: query.Hints.EndMs,
|
||||||
|
Step: query.Hints.StepMs,
|
||||||
|
Func: query.Hints.Func,
|
||||||
|
Grouping: query.Hints.Grouping,
|
||||||
|
Range: query.Hints.RangeMs,
|
||||||
|
By: query.Hints.By,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ws, err := StreamChunkedReadResponses(
|
||||||
|
NewChunkedWriter(w, f),
|
||||||
|
int64(i),
|
||||||
|
// The streaming API has to provide the series sorted.
|
||||||
|
querier.Select(true, hints, filteredMatchers...),
|
||||||
|
sortedExternalLabels,
|
||||||
|
h.remoteReadMaxBytesInFrame,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, w := range ws {
|
||||||
|
level.Warn(h.logger).Log("msg", "Warnings on chunked remote read query", "warnings", w.Error())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
if httpErr, ok := err.(HTTPError); ok {
|
||||||
|
http.Error(w, httpErr.Error(), httpErr.Status())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
||||||
|
// to a matcher that looks for an empty label,
|
||||||
|
// as that label should not be present in the storage.
|
||||||
|
func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) {
|
||||||
|
matchers, err := FromLabelMatchers(pbMatchers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
filteredMatchers := make([]*labels.Matcher, 0, len(matchers))
|
||||||
|
for _, m := range matchers {
|
||||||
|
value := externalLabels[m.Name]
|
||||||
|
if m.Type == labels.MatchEqual && value == m.Value {
|
||||||
|
matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
filteredMatchers = append(filteredMatchers, matcher)
|
||||||
|
} else {
|
||||||
|
filteredMatchers = append(filteredMatchers, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filteredMatchers, nil
|
||||||
|
}
|
331
storage/remote/read_handler_test.go
Normal file
331
storage/remote/read_handler_test.go
Normal file
|
@ -0,0 +1,331 @@
|
||||||
|
// Copyright 2021 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package remote
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/golang/snappy"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSampledReadEndpoint(t *testing.T) {
|
||||||
|
suite, err := promql.NewTest(t, `
|
||||||
|
load 1m
|
||||||
|
test_metric1{foo="bar",baz="qux"} 1
|
||||||
|
`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer suite.Close()
|
||||||
|
|
||||||
|
err = suite.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
h := NewReadHandler(nil, nil, suite.Storage(), func() config.Config {
|
||||||
|
return config.Config{
|
||||||
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
ExternalLabels: labels.Labels{
|
||||||
|
// We expect external labels to be added, with the source labels honored.
|
||||||
|
{Name: "baz", Value: "a"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}, 1e6, 1, 0)
|
||||||
|
|
||||||
|
// Encode the request.
|
||||||
|
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
query, err := ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
|
||||||
|
data, err := proto.Marshal(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
compressed := snappy.Encode(nil, data)
|
||||||
|
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
h.ServeHTTP(recorder, request)
|
||||||
|
|
||||||
|
if recorder.Code/100 != 2 {
|
||||||
|
t.Fatal(recorder.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, "application/x-protobuf", recorder.Result().Header.Get("Content-Type"))
|
||||||
|
require.Equal(t, "snappy", recorder.Result().Header.Get("Content-Encoding"))
|
||||||
|
|
||||||
|
// Decode the response.
|
||||||
|
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
uncompressed, err := snappy.Decode(nil, compressed)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var resp prompb.ReadResponse
|
||||||
|
err = proto.Unmarshal(uncompressed, &resp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if len(resp.Results) != 1 {
|
||||||
|
t.Fatalf("Expected 1 result, got %d", len(resp.Results))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, &prompb.QueryResult{
|
||||||
|
Timeseries: []*prompb.TimeSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar"},
|
||||||
|
},
|
||||||
|
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, resp.Results[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamReadEndpoint(t *testing.T) {
|
||||||
|
// First with 120 samples. We expect 1 frame with 1 chunk.
|
||||||
|
// Second with 121 samples, We expect 1 frame with 2 chunks.
|
||||||
|
// Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit.
|
||||||
|
suite, err := promql.NewTest(t, `
|
||||||
|
load 1m
|
||||||
|
test_metric1{foo="bar1",baz="qux"} 0+100x119
|
||||||
|
test_metric1{foo="bar2",baz="qux"} 0+100x120
|
||||||
|
test_metric1{foo="bar3",baz="qux"} 0+100x240
|
||||||
|
`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer suite.Close()
|
||||||
|
|
||||||
|
require.NoError(t, suite.Run())
|
||||||
|
|
||||||
|
api := NewReadHandler(nil, nil, suite.Storage(), func() config.Config {
|
||||||
|
return config.Config{
|
||||||
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
ExternalLabels: labels.Labels{
|
||||||
|
// We expect external labels to be added, with the source labels honored.
|
||||||
|
{Name: "baz", Value: "a"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
1e6, 1,
|
||||||
|
// Labelset has 57 bytes. Full chunk in test data has roughly 240 bytes. This allows us to have at max 2 chunks in this test.
|
||||||
|
57+480,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Encode the request.
|
||||||
|
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
query1, err := ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{
|
||||||
|
Step: 1,
|
||||||
|
Func: "avg",
|
||||||
|
Start: 0,
|
||||||
|
End: 14400001,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
query2, err := ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{
|
||||||
|
Step: 1,
|
||||||
|
Func: "avg",
|
||||||
|
Start: 0,
|
||||||
|
End: 14400001,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req := &prompb.ReadRequest{
|
||||||
|
Queries: []*prompb.Query{query1, query2},
|
||||||
|
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
|
||||||
|
}
|
||||||
|
data, err := proto.Marshal(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
compressed := snappy.Encode(nil, data)
|
||||||
|
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
api.ServeHTTP(recorder, request)
|
||||||
|
|
||||||
|
if recorder.Code/100 != 2 {
|
||||||
|
t.Fatal(recorder.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type"))
|
||||||
|
require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding"))
|
||||||
|
|
||||||
|
var results []*prompb.ChunkedReadResponse
|
||||||
|
stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil)
|
||||||
|
for {
|
||||||
|
res := &prompb.ChunkedReadResponse{}
|
||||||
|
err := stream.NextProto(res)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
results = append(results, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(results) != 5 {
|
||||||
|
t.Fatalf("Expected 5 result, got %d", len(results))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, []*prompb.ChunkedReadResponse{
|
||||||
|
{
|
||||||
|
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar1"},
|
||||||
|
},
|
||||||
|
Chunks: []prompb.Chunk{
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MaxTimeMs: 7140000,
|
||||||
|
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar2"},
|
||||||
|
},
|
||||||
|
Chunks: []prompb.Chunk{
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MaxTimeMs: 7140000,
|
||||||
|
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MinTimeMs: 7200000,
|
||||||
|
MaxTimeMs: 7200000,
|
||||||
|
Data: []byte("\000\001\200\364\356\006@\307p\000\000\000\000\000\000"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar3"},
|
||||||
|
},
|
||||||
|
Chunks: []prompb.Chunk{
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MaxTimeMs: 7140000,
|
||||||
|
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MinTimeMs: 7200000,
|
||||||
|
MaxTimeMs: 14340000,
|
||||||
|
Data: []byte("\000x\200\364\356\006@\307p\000\000\000\000\000\340\324\003\340>\224\355\260\277\322\200\372\005(=\240R\207:\003(\025\240\362\201z\003(\365\240r\203:\005(\r\241\322\201\372\r(\r\240R\237:\007(5\2402\201z\037(\025\2402\203:\005(\375\240R\200\372\r(\035\241\322\201:\003(5\240r\326g\364\271\213\227!\253q\037\312N\340GJ\033E)\375\024\241\266\362}(N\217(V\203)\336\207(\326\203(N\334W\322\203\2644\240}\005(\373AJ\031\3202\202\264\374\240\275\003(kA\3129\320R\201\2644\240\375\264\277\322\200\332\005(3\240r\207Z\003(\027\240\362\201Z\003(\363\240R\203\332\005(\017\241\322\201\332\r(\023\2402\237Z\007(7\2402\201Z\037(\023\240\322\200\332\005(\377\240R\200\332\r "),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar3"},
|
||||||
|
},
|
||||||
|
Chunks: []prompb.Chunk{
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MinTimeMs: 14400000,
|
||||||
|
MaxTimeMs: 14400000,
|
||||||
|
Data: []byte("\000\001\200\350\335\r@\327p\000\000\000\000\000\000"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric1"},
|
||||||
|
{Name: "b", Value: "c"},
|
||||||
|
{Name: "baz", Value: "qux"},
|
||||||
|
{Name: "d", Value: "e"},
|
||||||
|
{Name: "foo", Value: "bar1"},
|
||||||
|
},
|
||||||
|
Chunks: []prompb.Chunk{
|
||||||
|
{
|
||||||
|
Type: prompb.Chunk_XOR,
|
||||||
|
MaxTimeMs: 7140000,
|
||||||
|
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
QueryIndex: 1,
|
||||||
|
},
|
||||||
|
}, results)
|
||||||
|
}
|
|
@ -23,7 +23,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
type handler struct {
|
type writeHandler struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
appendable storage.Appendable
|
appendable storage.Appendable
|
||||||
}
|
}
|
||||||
|
@ -31,13 +31,13 @@ type handler struct {
|
||||||
// NewWriteHandler creates a http.Handler that accepts remote write requests and
|
// NewWriteHandler creates a http.Handler that accepts remote write requests and
|
||||||
// writes them to the provided appendable.
|
// writes them to the provided appendable.
|
||||||
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
|
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
|
||||||
return &handler{
|
return &writeHandler{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
appendable: appendable,
|
appendable: appendable,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
req, err := DecodeWriteRequest(r.Body)
|
req, err := DecodeWriteRequest(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
||||||
|
@ -62,7 +62,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
||||||
app := h.appendable.Appender(ctx)
|
app := h.appendable.Appender(ctx)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -39,11 +39,9 @@ import (
|
||||||
"github.com/prometheus/common/route"
|
"github.com/prometheus/common/route"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/gate"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/textparse"
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/promql/parser"
|
"github.com/prometheus/prometheus/promql/parser"
|
||||||
"github.com/prometheus/prometheus/rules"
|
"github.com/prometheus/prometheus/rules"
|
||||||
|
@ -56,11 +54,6 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/stats"
|
"github.com/prometheus/prometheus/util/stats"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
namespace = "prometheus"
|
|
||||||
subsystem = "api"
|
|
||||||
)
|
|
||||||
|
|
||||||
type status string
|
type status string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -83,12 +76,6 @@ const (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
LocalhostRepresentations = []string{"127.0.0.1", "localhost", "::1"}
|
LocalhostRepresentations = []string{"127.0.0.1", "localhost", "::1"}
|
||||||
remoteReadQueries = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: subsystem,
|
|
||||||
Name: "remote_read_queries",
|
|
||||||
Help: "The current number of remote read queries being executed or waiting.",
|
|
||||||
})
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type apiError struct {
|
type apiError struct {
|
||||||
|
@ -183,23 +170,21 @@ type API struct {
|
||||||
ready func(http.HandlerFunc) http.HandlerFunc
|
ready func(http.HandlerFunc) http.HandlerFunc
|
||||||
globalURLOptions GlobalURLOptions
|
globalURLOptions GlobalURLOptions
|
||||||
|
|
||||||
db TSDBAdminStats
|
db TSDBAdminStats
|
||||||
dbDir string
|
dbDir string
|
||||||
enableAdmin bool
|
enableAdmin bool
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
remoteReadSampleLimit int
|
CORSOrigin *regexp.Regexp
|
||||||
remoteReadMaxBytesInFrame int
|
buildInfo *PrometheusVersion
|
||||||
remoteReadGate *gate.Gate
|
runtimeInfo func() (RuntimeInfo, error)
|
||||||
CORSOrigin *regexp.Regexp
|
gatherer prometheus.Gatherer
|
||||||
buildInfo *PrometheusVersion
|
|
||||||
runtimeInfo func() (RuntimeInfo, error)
|
remoteWriteHandler http.Handler
|
||||||
gatherer prometheus.Gatherer
|
remoteReadHandler http.Handler
|
||||||
remoteWriteHandler http.Handler
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
jsoniter.RegisterTypeEncoderFunc("promql.Point", marshalPointJSON, marshalPointJSONIsEmpty)
|
jsoniter.RegisterTypeEncoderFunc("promql.Point", marshalPointJSON, marshalPointJSONIsEmpty)
|
||||||
prometheus.MustRegister(remoteReadQueries)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPI returns an initialized API type.
|
// NewAPI returns an initialized API type.
|
||||||
|
@ -225,6 +210,7 @@ func NewAPI(
|
||||||
runtimeInfo func() (RuntimeInfo, error),
|
runtimeInfo func() (RuntimeInfo, error),
|
||||||
buildInfo *PrometheusVersion,
|
buildInfo *PrometheusVersion,
|
||||||
gatherer prometheus.Gatherer,
|
gatherer prometheus.Gatherer,
|
||||||
|
registerer prometheus.Registerer,
|
||||||
) *API {
|
) *API {
|
||||||
a := &API{
|
a := &API{
|
||||||
QueryEngine: qe,
|
QueryEngine: qe,
|
||||||
|
@ -233,23 +219,22 @@ func NewAPI(
|
||||||
targetRetriever: tr,
|
targetRetriever: tr,
|
||||||
alertmanagerRetriever: ar,
|
alertmanagerRetriever: ar,
|
||||||
|
|
||||||
now: time.Now,
|
now: time.Now,
|
||||||
config: configFunc,
|
config: configFunc,
|
||||||
flagsMap: flagsMap,
|
flagsMap: flagsMap,
|
||||||
ready: readyFunc,
|
ready: readyFunc,
|
||||||
globalURLOptions: globalURLOptions,
|
globalURLOptions: globalURLOptions,
|
||||||
db: db,
|
db: db,
|
||||||
dbDir: dbDir,
|
dbDir: dbDir,
|
||||||
enableAdmin: enableAdmin,
|
enableAdmin: enableAdmin,
|
||||||
rulesRetriever: rr,
|
rulesRetriever: rr,
|
||||||
remoteReadSampleLimit: remoteReadSampleLimit,
|
logger: logger,
|
||||||
remoteReadGate: gate.New(remoteReadConcurrencyLimit),
|
CORSOrigin: CORSOrigin,
|
||||||
remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
|
runtimeInfo: runtimeInfo,
|
||||||
logger: logger,
|
buildInfo: buildInfo,
|
||||||
CORSOrigin: CORSOrigin,
|
gatherer: gatherer,
|
||||||
runtimeInfo: runtimeInfo,
|
|
||||||
buildInfo: buildInfo,
|
remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame),
|
||||||
gatherer: gatherer,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ap != nil {
|
if ap != nil {
|
||||||
|
@ -331,7 +316,6 @@ func (api *API) Register(r *route.Router) {
|
||||||
r.Put("/admin/tsdb/delete_series", wrap(api.deleteSeries))
|
r.Put("/admin/tsdb/delete_series", wrap(api.deleteSeries))
|
||||||
r.Put("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones))
|
r.Put("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones))
|
||||||
r.Put("/admin/tsdb/snapshot", wrap(api.snapshot))
|
r.Put("/admin/tsdb/snapshot", wrap(api.snapshot))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type queryData struct {
|
type queryData struct {
|
||||||
|
@ -1319,211 +1303,12 @@ func (api *API) serveTSDBStatus(*http.Request) apiFuncResult {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
// This is only really for tests - this will never be nil IRL.
|
||||||
if err := api.remoteReadGate.Start(ctx); err != nil {
|
if api.remoteReadHandler != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
api.remoteReadHandler.ServeHTTP(w, r)
|
||||||
return
|
} else {
|
||||||
|
http.Error(w, "not found", http.StatusNotFound)
|
||||||
}
|
}
|
||||||
remoteReadQueries.Inc()
|
|
||||||
|
|
||||||
defer api.remoteReadGate.Done()
|
|
||||||
defer remoteReadQueries.Dec()
|
|
||||||
|
|
||||||
req, err := remote.DecodeReadRequest(r)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
|
|
||||||
|
|
||||||
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
|
|
||||||
for name, value := range externalLabels {
|
|
||||||
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
|
|
||||||
Name: name,
|
|
||||||
Value: value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
sort.Slice(sortedExternalLabels, func(i, j int) bool {
|
|
||||||
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
|
|
||||||
})
|
|
||||||
|
|
||||||
responseType, err := remote.NegotiateResponseType(req.AcceptedResponseTypes)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
switch responseType {
|
|
||||||
case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
|
|
||||||
api.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels)
|
|
||||||
default:
|
|
||||||
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
|
|
||||||
api.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (api *API) remoteReadSamples(
|
|
||||||
ctx context.Context,
|
|
||||||
w http.ResponseWriter,
|
|
||||||
req *prompb.ReadRequest,
|
|
||||||
externalLabels map[string]string,
|
|
||||||
sortedExternalLabels []prompb.Label,
|
|
||||||
) {
|
|
||||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
|
||||||
w.Header().Set("Content-Encoding", "snappy")
|
|
||||||
|
|
||||||
resp := prompb.ReadResponse{
|
|
||||||
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
|
||||||
}
|
|
||||||
for i, query := range req.Queries {
|
|
||||||
if err := func() error {
|
|
||||||
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := querier.Close(); err != nil {
|
|
||||||
level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var hints *storage.SelectHints
|
|
||||||
if query.Hints != nil {
|
|
||||||
hints = &storage.SelectHints{
|
|
||||||
Start: query.Hints.StartMs,
|
|
||||||
End: query.Hints.EndMs,
|
|
||||||
Step: query.Hints.StepMs,
|
|
||||||
Func: query.Hints.Func,
|
|
||||||
Grouping: query.Hints.Grouping,
|
|
||||||
Range: query.Hints.RangeMs,
|
|
||||||
By: query.Hints.By,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var ws storage.Warnings
|
|
||||||
resp.Results[i], ws, err = remote.ToQueryResult(querier.Select(false, hints, filteredMatchers...), api.remoteReadSampleLimit)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, w := range ws {
|
|
||||||
level.Warn(api.logger).Log("msg", "Warnings on remote read query", "err", w.Error())
|
|
||||||
}
|
|
||||||
for _, ts := range resp.Results[i].Timeseries {
|
|
||||||
ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}(); err != nil {
|
|
||||||
if httpErr, ok := err.(remote.HTTPError); ok {
|
|
||||||
http.Error(w, httpErr.Error(), httpErr.Status())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := remote.EncodeReadResponse(&resp, w); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
|
|
||||||
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
|
|
||||||
|
|
||||||
f, ok := w.(http.Flusher)
|
|
||||||
if !ok {
|
|
||||||
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, query := range req.Queries {
|
|
||||||
if err := func() error {
|
|
||||||
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
querier, err := api.Queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := querier.Close(); err != nil {
|
|
||||||
level.Warn(api.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var hints *storage.SelectHints
|
|
||||||
if query.Hints != nil {
|
|
||||||
hints = &storage.SelectHints{
|
|
||||||
Start: query.Hints.StartMs,
|
|
||||||
End: query.Hints.EndMs,
|
|
||||||
Step: query.Hints.StepMs,
|
|
||||||
Func: query.Hints.Func,
|
|
||||||
Grouping: query.Hints.Grouping,
|
|
||||||
Range: query.Hints.RangeMs,
|
|
||||||
By: query.Hints.By,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ws, err := remote.StreamChunkedReadResponses(
|
|
||||||
remote.NewChunkedWriter(w, f),
|
|
||||||
int64(i),
|
|
||||||
// The streaming API has to provide the series sorted.
|
|
||||||
querier.Select(true, hints, filteredMatchers...),
|
|
||||||
sortedExternalLabels,
|
|
||||||
api.remoteReadMaxBytesInFrame,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, w := range ws {
|
|
||||||
level.Warn(api.logger).Log("msg", "Warnings on chunked remote read query", "warnings", w.Error())
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}(); err != nil {
|
|
||||||
if httpErr, ok := err.(remote.HTTPError); ok {
|
|
||||||
http.Error(w, httpErr.Error(), httpErr.Status())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
|
||||||
// to a matcher that looks for an empty label,
|
|
||||||
// as that label should not be present in the storage.
|
|
||||||
func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) {
|
|
||||||
matchers, err := remote.FromLabelMatchers(pbMatchers)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
filteredMatchers := make([]*labels.Matcher, 0, len(matchers))
|
|
||||||
for _, m := range matchers {
|
|
||||||
value := externalLabels[m.Name]
|
|
||||||
if m.Type == labels.MatchEqual && value == m.Value {
|
|
||||||
matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
filteredMatchers = append(filteredMatchers, matcher)
|
|
||||||
} else {
|
|
||||||
filteredMatchers = append(filteredMatchers, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return filteredMatchers, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
|
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
|
@ -14,11 +14,9 @@
|
||||||
package v1
|
package v1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -33,8 +31,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/gogo/protobuf/proto"
|
|
||||||
"github.com/golang/snappy"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
config_util "github.com/prometheus/common/config"
|
config_util "github.com/prometheus/common/config"
|
||||||
|
@ -44,7 +40,6 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/gate"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/textparse"
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
@ -1954,314 +1949,6 @@ func assertAPIResponseLength(t *testing.T, got interface{}, expLen int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSampledReadEndpoint(t *testing.T) {
|
|
||||||
suite, err := promql.NewTest(t, `
|
|
||||||
load 1m
|
|
||||||
test_metric1{foo="bar",baz="qux"} 1
|
|
||||||
`)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
defer suite.Close()
|
|
||||||
|
|
||||||
err = suite.Run()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
api := &API{
|
|
||||||
Queryable: suite.Storage(),
|
|
||||||
QueryEngine: suite.QueryEngine(),
|
|
||||||
config: func() config.Config {
|
|
||||||
return config.Config{
|
|
||||||
GlobalConfig: config.GlobalConfig{
|
|
||||||
ExternalLabels: labels.Labels{
|
|
||||||
// We expect external labels to be added, with the source labels honored.
|
|
||||||
{Name: "baz", Value: "a"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
remoteReadSampleLimit: 1e6,
|
|
||||||
remoteReadGate: gate.New(1),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode the request.
|
|
||||||
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
|
|
||||||
data, err := proto.Marshal(req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
compressed := snappy.Encode(nil, data)
|
|
||||||
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
api.remoteRead(recorder, request)
|
|
||||||
|
|
||||||
if recorder.Code/100 != 2 {
|
|
||||||
t.Fatal(recorder.Code)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, "application/x-protobuf", recorder.Result().Header.Get("Content-Type"))
|
|
||||||
require.Equal(t, "snappy", recorder.Result().Header.Get("Content-Encoding"))
|
|
||||||
|
|
||||||
// Decode the response.
|
|
||||||
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
uncompressed, err := snappy.Decode(nil, compressed)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var resp prompb.ReadResponse
|
|
||||||
err = proto.Unmarshal(uncompressed, &resp)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
if len(resp.Results) != 1 {
|
|
||||||
t.Fatalf("Expected 1 result, got %d", len(resp.Results))
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, &prompb.QueryResult{
|
|
||||||
Timeseries: []*prompb.TimeSeries{
|
|
||||||
{
|
|
||||||
Labels: []prompb.Label{
|
|
||||||
{Name: "__name__", Value: "test_metric1"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "baz", Value: "qux"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
{Name: "foo", Value: "bar"},
|
|
||||||
},
|
|
||||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, resp.Results[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStreamReadEndpoint(t *testing.T) {
|
|
||||||
// First with 120 samples. We expect 1 frame with 1 chunk.
|
|
||||||
// Second with 121 samples, We expect 1 frame with 2 chunks.
|
|
||||||
// Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit.
|
|
||||||
suite, err := promql.NewTest(t, `
|
|
||||||
load 1m
|
|
||||||
test_metric1{foo="bar1",baz="qux"} 0+100x119
|
|
||||||
test_metric1{foo="bar2",baz="qux"} 0+100x120
|
|
||||||
test_metric1{foo="bar3",baz="qux"} 0+100x240
|
|
||||||
`)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
defer suite.Close()
|
|
||||||
|
|
||||||
require.NoError(t, suite.Run())
|
|
||||||
|
|
||||||
api := &API{
|
|
||||||
Queryable: suite.Storage(),
|
|
||||||
QueryEngine: suite.QueryEngine(),
|
|
||||||
config: func() config.Config {
|
|
||||||
return config.Config{
|
|
||||||
GlobalConfig: config.GlobalConfig{
|
|
||||||
ExternalLabels: labels.Labels{
|
|
||||||
// We expect external labels to be added, with the source labels honored.
|
|
||||||
{Name: "baz", Value: "a"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
remoteReadSampleLimit: 1e6,
|
|
||||||
remoteReadGate: gate.New(1),
|
|
||||||
// Labelset has 57 bytes. Full chunk in test data has roughly 240 bytes. This allows us to have at max 2 chunks in this test.
|
|
||||||
remoteReadMaxBytesInFrame: 57 + 480,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode the request.
|
|
||||||
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{
|
|
||||||
Step: 1,
|
|
||||||
Func: "avg",
|
|
||||||
Start: 0,
|
|
||||||
End: 14400001,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{
|
|
||||||
Step: 1,
|
|
||||||
Func: "avg",
|
|
||||||
Start: 0,
|
|
||||||
End: 14400001,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
req := &prompb.ReadRequest{
|
|
||||||
Queries: []*prompb.Query{query1, query2},
|
|
||||||
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
|
|
||||||
}
|
|
||||||
data, err := proto.Marshal(req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
compressed := snappy.Encode(nil, data)
|
|
||||||
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
api.remoteRead(recorder, request)
|
|
||||||
|
|
||||||
if recorder.Code/100 != 2 {
|
|
||||||
t.Fatal(recorder.Code)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type"))
|
|
||||||
require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding"))
|
|
||||||
|
|
||||||
var results []*prompb.ChunkedReadResponse
|
|
||||||
stream := remote.NewChunkedReader(recorder.Result().Body, remote.DefaultChunkedReadLimit, nil)
|
|
||||||
for {
|
|
||||||
res := &prompb.ChunkedReadResponse{}
|
|
||||||
err := stream.NextProto(res)
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
require.NoError(t, err)
|
|
||||||
results = append(results, res)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(results) != 5 {
|
|
||||||
t.Fatalf("Expected 5 result, got %d", len(results))
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, []*prompb.ChunkedReadResponse{
|
|
||||||
{
|
|
||||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
|
||||||
{
|
|
||||||
Labels: []prompb.Label{
|
|
||||||
{Name: "__name__", Value: "test_metric1"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "baz", Value: "qux"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
{Name: "foo", Value: "bar1"},
|
|
||||||
},
|
|
||||||
Chunks: []prompb.Chunk{
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MaxTimeMs: 7140000,
|
|
||||||
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
|
||||||
{
|
|
||||||
Labels: []prompb.Label{
|
|
||||||
{Name: "__name__", Value: "test_metric1"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "baz", Value: "qux"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
{Name: "foo", Value: "bar2"},
|
|
||||||
},
|
|
||||||
Chunks: []prompb.Chunk{
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MaxTimeMs: 7140000,
|
|
||||||
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MinTimeMs: 7200000,
|
|
||||||
MaxTimeMs: 7200000,
|
|
||||||
Data: []byte("\000\001\200\364\356\006@\307p\000\000\000\000\000\000"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
|
||||||
{
|
|
||||||
Labels: []prompb.Label{
|
|
||||||
{Name: "__name__", Value: "test_metric1"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "baz", Value: "qux"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
{Name: "foo", Value: "bar3"},
|
|
||||||
},
|
|
||||||
Chunks: []prompb.Chunk{
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MaxTimeMs: 7140000,
|
|
||||||
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MinTimeMs: 7200000,
|
|
||||||
MaxTimeMs: 14340000,
|
|
||||||
Data: []byte("\000x\200\364\356\006@\307p\000\000\000\000\000\340\324\003\340>\224\355\260\277\322\200\372\005(=\240R\207:\003(\025\240\362\201z\003(\365\240r\203:\005(\r\241\322\201\372\r(\r\240R\237:\007(5\2402\201z\037(\025\2402\203:\005(\375\240R\200\372\r(\035\241\322\201:\003(5\240r\326g\364\271\213\227!\253q\037\312N\340GJ\033E)\375\024\241\266\362}(N\217(V\203)\336\207(\326\203(N\334W\322\203\2644\240}\005(\373AJ\031\3202\202\264\374\240\275\003(kA\3129\320R\201\2644\240\375\264\277\322\200\332\005(3\240r\207Z\003(\027\240\362\201Z\003(\363\240R\203\332\005(\017\241\322\201\332\r(\023\2402\237Z\007(7\2402\201Z\037(\023\240\322\200\332\005(\377\240R\200\332\r "),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
|
||||||
{
|
|
||||||
Labels: []prompb.Label{
|
|
||||||
{Name: "__name__", Value: "test_metric1"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "baz", Value: "qux"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
{Name: "foo", Value: "bar3"},
|
|
||||||
},
|
|
||||||
Chunks: []prompb.Chunk{
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MinTimeMs: 14400000,
|
|
||||||
MaxTimeMs: 14400000,
|
|
||||||
Data: []byte("\000\001\200\350\335\r@\327p\000\000\000\000\000\000"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
|
||||||
{
|
|
||||||
Labels: []prompb.Label{
|
|
||||||
{Name: "__name__", Value: "test_metric1"},
|
|
||||||
{Name: "b", Value: "c"},
|
|
||||||
{Name: "baz", Value: "qux"},
|
|
||||||
{Name: "d", Value: "e"},
|
|
||||||
{Name: "foo", Value: "bar1"},
|
|
||||||
},
|
|
||||||
Chunks: []prompb.Chunk{
|
|
||||||
{
|
|
||||||
Type: prompb.Chunk_XOR,
|
|
||||||
MaxTimeMs: 7140000,
|
|
||||||
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
QueryIndex: 1,
|
|
||||||
},
|
|
||||||
}, results)
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakeDB struct {
|
type fakeDB struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
|
@ -328,6 +328,7 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
h.runtimeInfo,
|
h.runtimeInfo,
|
||||||
h.versionInfo,
|
h.versionInfo,
|
||||||
o.Gatherer,
|
o.Gatherer,
|
||||||
|
o.Registerer,
|
||||||
)
|
)
|
||||||
|
|
||||||
if o.RoutePrefix != "/" {
|
if o.RoutePrefix != "/" {
|
||||||
|
|
Loading…
Reference in a new issue