mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
parent
3569eef8b1
commit
c25f7c600b
|
@ -496,6 +496,23 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
OldestInclusive: from,
|
OldestInclusive: from,
|
||||||
NewestInclusive: through,
|
NewestInclusive: through,
|
||||||
}))
|
}))
|
||||||
|
externalLabels := api.config().GlobalConfig.ExternalLabels.Clone()
|
||||||
|
for _, ts := range resp.Results[i].Timeseries {
|
||||||
|
globalUsed := map[string]struct{}{}
|
||||||
|
for _, l := range ts.Labels {
|
||||||
|
if _, ok := externalLabels[model.LabelName(l.Name)]; ok {
|
||||||
|
globalUsed[l.Name] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for ln, lv := range externalLabels {
|
||||||
|
if _, ok := globalUsed[string(ln)]; !ok {
|
||||||
|
ts.Labels = append(ts.Labels, &remote.LabelPair{
|
||||||
|
Name: string(ln),
|
||||||
|
Value: string(lv),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := remote.EncodeReadResponse(&resp, w); err != nil {
|
if err := remote.EncodeReadResponse(&resp, w); err != nil {
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package v1
|
package v1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -25,6 +26,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/common/route"
|
"github.com/prometheus/common/route"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
@ -32,6 +35,8 @@ import (
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/retrieval"
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
type targetRetrieverFunc func() []*retrieval.Target
|
type targetRetrieverFunc func() []*retrieval.Target
|
||||||
|
@ -532,6 +537,89 @@ func TestEndpoints(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadEndpoint(t *testing.T) {
|
||||||
|
suite, err := promql.NewTest(t, `
|
||||||
|
load 1m
|
||||||
|
test_metric1{foo="bar",baz="qux"} 1
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer suite.Close()
|
||||||
|
|
||||||
|
if err := suite.Run(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
api := &API{
|
||||||
|
Storage: suite.Storage(),
|
||||||
|
QueryEngine: suite.QueryEngine(),
|
||||||
|
config: func() config.Config {
|
||||||
|
return config.Config{
|
||||||
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
ExternalLabels: model.LabelSet{
|
||||||
|
"baz": "a",
|
||||||
|
"b": "c",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode the request.
|
||||||
|
matcher, err := metric.NewLabelMatcher(metric.Equal, "__name__", "test_metric1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
query, err := remote.ToQuery(0, 1, metric.LabelMatchers{matcher})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
req := &remote.ReadRequest{Queries: []*remote.Query{query}}
|
||||||
|
data, err := proto.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
compressed := snappy.Encode(nil, data)
|
||||||
|
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
api.remoteRead(recorder, request)
|
||||||
|
|
||||||
|
// Decode the response.
|
||||||
|
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
uncompressed, err := snappy.Decode(nil, compressed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp remote.ReadResponse
|
||||||
|
err = proto.Unmarshal(uncompressed, &resp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Results) != 1 {
|
||||||
|
t.Fatalf("Expected 1 result, got %d", len(resp.Results))
|
||||||
|
}
|
||||||
|
|
||||||
|
result := remote.FromQueryResult(resp.Results[0])
|
||||||
|
expected := &model.Matrix{
|
||||||
|
&model.SampleStream{
|
||||||
|
Metric: model.Metric{"__name__": "test_metric1", "b": "c", "baz": "qux", "foo": "bar"},
|
||||||
|
Values: []model.SamplePair{model.SamplePair{Value: 1, Timestamp: 0}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(&result, expected) {
|
||||||
|
t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRespondSuccess(t *testing.T) {
|
func TestRespondSuccess(t *testing.T) {
|
||||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
respond(w, "test")
|
respond(w, "test")
|
||||||
|
|
Loading…
Reference in a new issue