mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-25 12:42:47 -08:00
Add remote read filter option
For special remote read endpoints which have only data for specific queries, it is desired to limit the number of queries sent to the configured remote read endpoint to reduce latency and performance overhead.
This commit is contained in:
parent
434f0374f7
commit
7098c56474
|
@ -1496,6 +1496,10 @@ type RemoteReadConfig struct {
|
|||
// values arbitrarily into the overflow maps of further-down types.
|
||||
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
||||
|
||||
// RequiredMatchers is an optional list of equality matchers which have to
|
||||
// be present in a selector to query the remote read endpoint.
|
||||
RequiredMatchers model.LabelSet `yaml:"required_matchers,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ var expectedConf = &Config{
|
|||
URL: mustParseURL("http://remote3/read"),
|
||||
RemoteTimeout: model.Duration(1 * time.Minute),
|
||||
ReadRecent: false,
|
||||
RequiredMatchers: model.LabelSet{"job": "special"},
|
||||
},
|
||||
},
|
||||
|
||||
|
|
2
config/testdata/conf.good.yml
vendored
2
config/testdata/conf.good.yml
vendored
|
@ -25,6 +25,8 @@ remote_read:
|
|||
read_recent: true
|
||||
- url: http://remote3/read
|
||||
read_recent: false
|
||||
required_matchers:
|
||||
job: special
|
||||
|
||||
scrape_configs:
|
||||
- job_name: prometheus
|
||||
|
|
|
@ -1134,6 +1134,11 @@ likely in future releases.
|
|||
# The URL of the endpoint to query from.
|
||||
url: <string>
|
||||
|
||||
# An optional list of equality matchers which have to be
|
||||
# present in a selector to query the remote read endpoint.
|
||||
required_matchers:
|
||||
[ <labelname>: <labelvalue> ... ]
|
||||
|
||||
# Timeout for requests to the remote read endpoint.
|
||||
[ remote_timeout: <duration> | default = 30s ]
|
||||
|
||||
|
|
|
@ -119,6 +119,47 @@ func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) stor
|
|||
})
|
||||
}
|
||||
|
||||
// RequiredMatchersFilter returns a storage.Queryable which creates a
|
||||
// requiredMatchersQuerier.
|
||||
func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable {
|
||||
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
q, err := next.Querier(ctx, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil
|
||||
})
|
||||
}
|
||||
|
||||
// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls
|
||||
// to match the given labelSet.
|
||||
type requiredMatchersQuerier struct {
|
||||
storage.Querier
|
||||
|
||||
requiredMatchers []*labels.Matcher
|
||||
}
|
||||
|
||||
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
||||
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
||||
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
ms := q.requiredMatchers
|
||||
for _, m := range matchers {
|
||||
for i, r := range ms {
|
||||
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
|
||||
ms = append(ms[:i], ms[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(ms) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(ms) > 0 {
|
||||
return storage.NoopSeriesSet()
|
||||
}
|
||||
return q.Querier.Select(matchers...)
|
||||
}
|
||||
|
||||
// addExternalLabels adds matchers for each external label. External labels
|
||||
// that already have a corresponding user-supplied matcher are skipped, as we
|
||||
// assume that the user explicitly wants to select a different value for them.
|
||||
|
|
|
@ -143,13 +143,21 @@ func TestSeriesSetFilter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type testQuerier struct {
|
||||
type mockQuerier struct {
|
||||
ctx context.Context
|
||||
mint, maxt int64
|
||||
|
||||
storage.Querier
|
||||
}
|
||||
|
||||
type mockSeriesSet struct {
|
||||
storage.SeriesSet
|
||||
}
|
||||
|
||||
func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet {
|
||||
return mockSeriesSet{}
|
||||
}
|
||||
|
||||
func TestPreferLocalStorageFilter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -163,13 +171,13 @@ func TestPreferLocalStorageFilter(t *testing.T) {
|
|||
localStartTime: int64(100),
|
||||
mint: int64(0),
|
||||
maxt: int64(50),
|
||||
querier: testQuerier{ctx: ctx, mint: 0, maxt: 50},
|
||||
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
|
||||
},
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
mint: int64(0),
|
||||
maxt: int64(50),
|
||||
querier: testQuerier{ctx: ctx, mint: 0, maxt: 20},
|
||||
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20},
|
||||
},
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
|
@ -182,7 +190,7 @@ func TestPreferLocalStorageFilter(t *testing.T) {
|
|||
for i, test := range tests {
|
||||
f := PreferLocalStorageFilter(
|
||||
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return testQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
|
||||
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
|
||||
}),
|
||||
func() (int64, error) { return test.localStartTime, nil },
|
||||
)
|
||||
|
@ -197,3 +205,116 @@ func TestPreferLocalStorageFilter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequiredMatchersFilter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
f := RequiredMatchersFilter(
|
||||
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
|
||||
}),
|
||||
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")},
|
||||
)
|
||||
|
||||
want := &requiredMatchersQuerier{
|
||||
Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
|
||||
requiredMatchers: []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")},
|
||||
}
|
||||
have, err := f.Querier(ctx, 0, 50)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("expected quierer %+v, got %+v", want, have)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequiredLabelsQuerierSelect(t *testing.T) {
|
||||
tests := []struct {
|
||||
requiredMatchers []*labels.Matcher
|
||||
matchers []*labels.Matcher
|
||||
seriesSet storage.SeriesSet
|
||||
}{
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchRegexp, "special", "label"),
|
||||
},
|
||||
seriesSet: storage.NoopSeriesSet(),
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "different"),
|
||||
},
|
||||
seriesSet: storage.NoopSeriesSet(),
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "baz"),
|
||||
},
|
||||
seriesSet: storage.NoopSeriesSet(),
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
q := &requiredMatchersQuerier{
|
||||
Querier: mockQuerier{},
|
||||
requiredMatchers: test.requiredMatchers,
|
||||
}
|
||||
|
||||
if want, have := test.seriesSet, q.Select(test.matchers...); want != have {
|
||||
t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
|
||||
}
|
||||
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
|
@ -101,6 +102,9 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
var q storage.Queryable
|
||||
q = QueryableClient(c)
|
||||
q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels)
|
||||
if len(rrConf.RequiredMatchers) > 0 {
|
||||
q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers))
|
||||
}
|
||||
if !rrConf.ReadRecent {
|
||||
q = PreferLocalStorageFilter(q, s.localStartTimeCallback)
|
||||
}
|
||||
|
@ -144,3 +148,15 @@ func (s *Storage) Close() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher {
|
||||
ms := make([]*labels.Matcher, 0, len(ls))
|
||||
for k, v := range ls {
|
||||
ms = append(ms, &labels.Matcher{
|
||||
Type: labels.MatchEqual,
|
||||
Name: string(k),
|
||||
Value: string(v),
|
||||
})
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue