mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #3453 from prometheus/grobie/remote-read-filter
Add remote read filter option
This commit is contained in:
commit
a51c500e30
|
@ -1496,6 +1496,10 @@ type RemoteReadConfig struct {
|
|||
// values arbitrarily into the overflow maps of further-down types.
|
||||
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
||||
|
||||
// RequiredMatchers is an optional list of equality matchers which have to
|
||||
// be present in a selector to query the remote read endpoint.
|
||||
RequiredMatchers model.LabelSet `yaml:"required_matchers,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ var expectedConf = &Config{
|
|||
URL: mustParseURL("http://remote3/read"),
|
||||
RemoteTimeout: model.Duration(1 * time.Minute),
|
||||
ReadRecent: false,
|
||||
RequiredMatchers: model.LabelSet{"job": "special"},
|
||||
},
|
||||
},
|
||||
|
||||
|
|
2
config/testdata/conf.good.yml
vendored
2
config/testdata/conf.good.yml
vendored
|
@ -25,6 +25,8 @@ remote_read:
|
|||
read_recent: true
|
||||
- url: http://remote3/read
|
||||
read_recent: false
|
||||
required_matchers:
|
||||
job: special
|
||||
|
||||
scrape_configs:
|
||||
- job_name: prometheus
|
||||
|
|
|
@ -1134,6 +1134,11 @@ likely in future releases.
|
|||
# The URL of the endpoint to query from.
|
||||
url: <string>
|
||||
|
||||
# An optional list of equality matchers which have to be
|
||||
# present in a selector to query the remote read endpoint.
|
||||
required_matchers:
|
||||
[ <labelname>: <labelvalue> ... ]
|
||||
|
||||
# Timeout for requests to the remote read endpoint.
|
||||
[ remote_timeout: <duration> | default = 30s ]
|
||||
|
||||
|
|
|
@ -31,12 +31,11 @@ var (
|
|||
// Storage ingests and manages samples, along with various indexes. All methods
|
||||
// are goroutine-safe. Storage implements storage.SampleAppender.
|
||||
type Storage interface {
|
||||
Queryable
|
||||
|
||||
// StartTime returns the oldest timestamp stored in the storage.
|
||||
StartTime() (int64, error)
|
||||
|
||||
// Querier returns a new Querier on the storage.
|
||||
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
|
||||
// Appender returns a new appender against the storage.
|
||||
Appender() (Appender, error)
|
||||
|
||||
|
@ -44,6 +43,12 @@ type Storage interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
// A Queryable handles queries against a storage.
|
||||
type Queryable interface {
|
||||
// Querier returns a new Querier on the storage.
|
||||
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
}
|
||||
|
||||
// Querier provides reading access to time series data.
|
||||
type Querier interface {
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
|
@ -56,6 +61,15 @@ type Querier interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
||||
// Queryables. It follows the idea of http.HandlerFunc.
|
||||
type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
|
||||
// Querier calls f() with the given parameters.
|
||||
func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
||||
return f(ctx, mint, maxt)
|
||||
}
|
||||
|
||||
// Appender provides batched appends against a storage.
|
||||
type Appender interface {
|
||||
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
||||
|
|
|
@ -37,18 +37,16 @@ const maxErrMsgLen = 256
|
|||
|
||||
// Client allows reading and writing from/to a remote HTTP endpoint.
|
||||
type Client struct {
|
||||
index int // Used to differentiate metrics.
|
||||
index int // Used to differentiate clients in metrics.
|
||||
url *config.URL
|
||||
client *http.Client
|
||||
timeout time.Duration
|
||||
readRecent bool
|
||||
}
|
||||
|
||||
// ClientConfig configures a Client.
|
||||
type ClientConfig struct {
|
||||
URL *config.URL
|
||||
Timeout model.Duration
|
||||
ReadRecent bool
|
||||
HTTPClientConfig config.HTTPClientConfig
|
||||
}
|
||||
|
||||
|
@ -64,7 +62,6 @@ func NewClient(index int, conf *ClientConfig) (*Client, error) {
|
|||
url: conf.URL,
|
||||
client: httpClient,
|
||||
timeout: time.Duration(conf.Timeout),
|
||||
readRecent: conf.ReadRecent,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -158,6 +158,12 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
|
|||
}
|
||||
}
|
||||
|
||||
type byLabel []storage.Series
|
||||
|
||||
func (a byLabel) Len() int { return len(a) }
|
||||
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
|
||||
|
||||
// errSeriesSet implements storage.SeriesSet, just returning an error.
|
||||
type errSeriesSet struct {
|
||||
err error
|
||||
|
|
|
@ -21,55 +21,30 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// Querier returns a new Querier on the storage.
|
||||
func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
queriers := make([]storage.Querier, 0, len(r.clients))
|
||||
localStartTime, err := r.localStartTimeCallback()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, c := range r.clients {
|
||||
cmaxt := maxt
|
||||
if !c.readRecent {
|
||||
// Avoid queries whose timerange is later than the first timestamp in local DB.
|
||||
if mint > localStartTime {
|
||||
continue
|
||||
}
|
||||
// Query only samples older than the first timestamp in local DB.
|
||||
if maxt > localStartTime {
|
||||
cmaxt = localStartTime
|
||||
}
|
||||
}
|
||||
queriers = append(queriers, &querier{
|
||||
// QueryableClient returns a storage.Queryable which queries the given
|
||||
// Client to select series sets.
|
||||
func QueryableClient(c *Client) storage.Queryable {
|
||||
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return &querier{
|
||||
ctx: ctx,
|
||||
mint: mint,
|
||||
maxt: cmaxt,
|
||||
maxt: maxt,
|
||||
client: c,
|
||||
externalLabels: r.externalLabels,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
return newMergeQueriers(queriers), nil
|
||||
}
|
||||
|
||||
// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed.
|
||||
var newMergeQueriers = storage.NewMergeQuerier
|
||||
|
||||
// Querier is an adapter to make a Client usable as a storage.Querier.
|
||||
// querier is an adapter to make a Client usable as a storage.Querier.
|
||||
type querier struct {
|
||||
ctx context.Context
|
||||
mint, maxt int64
|
||||
client *Client
|
||||
externalLabels model.LabelSet
|
||||
}
|
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
// Select implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
m, added := q.addExternalLabels(matchers)
|
||||
|
||||
query, err := ToQuery(q.mint, q.maxt, m)
|
||||
query, err := ToQuery(q.mint, q.maxt, matchers)
|
||||
if err != nil {
|
||||
return errSeriesSet{err: err}
|
||||
}
|
||||
|
@ -79,40 +54,124 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
|||
return errSeriesSet{err: err}
|
||||
}
|
||||
|
||||
seriesSet := FromQueryResult(res)
|
||||
|
||||
return newSeriesSetFilter(seriesSet, added)
|
||||
return FromQueryResult(res)
|
||||
}
|
||||
|
||||
type byLabel []storage.Series
|
||||
|
||||
func (a byLabel) Len() int { return len(a) }
|
||||
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
// LabelValues implements storage.Querier and is a noop.
|
||||
func (q *querier) LabelValues(name string) ([]string, error) {
|
||||
// TODO implement?
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Close releases the resources of the Querier.
|
||||
// Close implements storage.Querier and is a noop.
|
||||
func (q *querier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExternablLabelsHandler returns a storage.Queryable which creates a
|
||||
// externalLabelsQuerier.
|
||||
func ExternablLabelsHandler(next storage.Queryable, externalLabels model.LabelSet) storage.Queryable {
|
||||
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
q, err := next.Querier(ctx, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil
|
||||
})
|
||||
}
|
||||
|
||||
// externalLabelsQuerier is a querier which ensures that Select() results match
|
||||
// the configured external labels.
|
||||
type externalLabelsQuerier struct {
|
||||
storage.Querier
|
||||
|
||||
externalLabels model.LabelSet
|
||||
}
|
||||
|
||||
// Select adds equality matchers for all external labels to the list of matchers
|
||||
// before calling the wrapped storage.Queryable. The added external labels are
|
||||
// removed from the returned series sets.
|
||||
func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
m, added := q.addExternalLabels(matchers)
|
||||
s := q.Querier.Select(m...)
|
||||
return newSeriesSetFilter(s, added)
|
||||
}
|
||||
|
||||
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
|
||||
// if requested timeframe can be answered completely by the local TSDB, and
|
||||
// reduces maxt if the timeframe can be partially answered by TSDB.
|
||||
func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable {
|
||||
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
localStartTime, err := cb()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cmaxt := maxt
|
||||
// Avoid queries whose timerange is later than the first timestamp in local DB.
|
||||
if mint > localStartTime {
|
||||
return storage.NoopQuerier(), nil
|
||||
}
|
||||
// Query only samples older than the first timestamp in local DB.
|
||||
if maxt > localStartTime {
|
||||
cmaxt = localStartTime
|
||||
}
|
||||
return next.Querier(ctx, mint, cmaxt)
|
||||
})
|
||||
}
|
||||
|
||||
// RequiredMatchersFilter returns a storage.Queryable which creates a
|
||||
// requiredMatchersQuerier.
|
||||
func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable {
|
||||
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
q, err := next.Querier(ctx, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil
|
||||
})
|
||||
}
|
||||
|
||||
// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls
|
||||
// to match the given labelSet.
|
||||
type requiredMatchersQuerier struct {
|
||||
storage.Querier
|
||||
|
||||
requiredMatchers []*labels.Matcher
|
||||
}
|
||||
|
||||
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
||||
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
||||
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
ms := q.requiredMatchers
|
||||
for _, m := range matchers {
|
||||
for i, r := range ms {
|
||||
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
|
||||
ms = append(ms[:i], ms[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(ms) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(ms) > 0 {
|
||||
return storage.NoopSeriesSet()
|
||||
}
|
||||
return q.Querier.Select(matchers...)
|
||||
}
|
||||
|
||||
// addExternalLabels adds matchers for each external label. External labels
|
||||
// that already have a corresponding user-supplied matcher are skipped, as we
|
||||
// assume that the user explicitly wants to select a different value for them.
|
||||
// We return the new set of matchers, along with a map of labels for which
|
||||
// matchers were added, so that these can later be removed from the result
|
||||
// time series again.
|
||||
func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) {
|
||||
func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) {
|
||||
el := make(model.LabelSet, len(q.externalLabels))
|
||||
for k, v := range q.externalLabels {
|
||||
el[k] = v
|
||||
}
|
||||
for _, m := range matchers {
|
||||
for _, m := range ms {
|
||||
if _, ok := el[model.LabelName(m.Name)]; ok {
|
||||
delete(el, model.LabelName(m.Name))
|
||||
}
|
||||
|
@ -122,9 +181,9 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
matchers = append(matchers, m)
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return matchers, el
|
||||
return ms, el
|
||||
}
|
||||
|
||||
func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet {
|
||||
|
|
|
@ -18,10 +18,8 @@ import (
|
|||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -35,7 +33,22 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher
|
|||
return m
|
||||
}
|
||||
|
||||
func TestAddExternalLabels(t *testing.T) {
|
||||
func TestExternalLabelsQuerierSelect(t *testing.T) {
|
||||
matchers := []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
|
||||
}
|
||||
q := &externalLabelsQuerier{
|
||||
Querier: mockQuerier{},
|
||||
externalLabels: model.LabelSet{"region": "europe"},
|
||||
}
|
||||
|
||||
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
|
||||
if have := q.Select(matchers...); !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("expected series set %+v, got %+v", want, have)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
|
||||
tests := []struct {
|
||||
el model.LabelSet
|
||||
inMatchers []*labels.Matcher
|
||||
|
@ -80,10 +93,7 @@ func TestAddExternalLabels(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, test := range tests {
|
||||
q := querier{
|
||||
externalLabels: test.el,
|
||||
}
|
||||
|
||||
q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el}
|
||||
matchers, added := q.addExternalLabels(test.inMatchers)
|
||||
|
||||
sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name })
|
||||
|
@ -133,58 +143,178 @@ func TestSeriesSetFilter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type mockMergeQuerier struct{ queriersCount int }
|
||||
type mockQuerier struct {
|
||||
ctx context.Context
|
||||
mint, maxt int64
|
||||
|
||||
func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil }
|
||||
func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
|
||||
func (*mockMergeQuerier) Close() error { return nil }
|
||||
storage.Querier
|
||||
}
|
||||
|
||||
type mockSeriesSet struct {
|
||||
storage.SeriesSet
|
||||
}
|
||||
|
||||
func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet {
|
||||
return mockSeriesSet{}
|
||||
}
|
||||
|
||||
func TestPreferLocalStorageFilter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
func TestRemoteStorageQuerier(t *testing.T) {
|
||||
tests := []struct {
|
||||
localStartTime int64
|
||||
readRecentClients []bool
|
||||
mint int64
|
||||
maxt int64
|
||||
expectedQueriersCount int
|
||||
querier storage.Querier
|
||||
}{
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
readRecentClients: []bool{true, true, false},
|
||||
localStartTime: int64(100),
|
||||
mint: int64(0),
|
||||
maxt: int64(50),
|
||||
expectedQueriersCount: 3,
|
||||
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
|
||||
},
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
mint: int64(0),
|
||||
maxt: int64(50),
|
||||
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20},
|
||||
},
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
readRecentClients: []bool{true, true, false},
|
||||
mint: int64(30),
|
||||
maxt: int64(50),
|
||||
expectedQueriersCount: 2,
|
||||
querier: storage.NoopQuerier(),
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil })
|
||||
s.clients = []*Client{}
|
||||
for _, readRecent := range test.readRecentClients {
|
||||
c, _ := NewClient(0, &ClientConfig{
|
||||
URL: nil,
|
||||
Timeout: model.Duration(30 * time.Second),
|
||||
HTTPClientConfig: config.HTTPClientConfig{},
|
||||
ReadRecent: readRecent,
|
||||
})
|
||||
s.clients = append(s.clients, c)
|
||||
}
|
||||
// overrides mergeQuerier to mockMergeQuerier so we can reflect its type
|
||||
newMergeQueriers = func(queriers []storage.Querier) storage.Querier {
|
||||
return &mockMergeQuerier{queriersCount: len(queriers)}
|
||||
f := PreferLocalStorageFilter(
|
||||
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
|
||||
}),
|
||||
func() (int64, error) { return test.localStartTime, nil },
|
||||
)
|
||||
|
||||
q, err := f.Querier(ctx, test.mint, test.maxt)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
querier, _ := s.Querier(context.Background(), test.mint, test.maxt)
|
||||
actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount
|
||||
|
||||
if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) {
|
||||
t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount)
|
||||
if test.querier != q {
|
||||
t.Errorf("%d. expected quierer %+v, got %+v", i, test.querier, q)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequiredMatchersFilter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
f := RequiredMatchersFilter(
|
||||
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
|
||||
}),
|
||||
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")},
|
||||
)
|
||||
|
||||
want := &requiredMatchersQuerier{
|
||||
Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
|
||||
requiredMatchers: []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")},
|
||||
}
|
||||
have, err := f.Querier(ctx, 0, 50)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("expected quierer %+v, got %+v", want, have)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequiredLabelsQuerierSelect(t *testing.T) {
|
||||
tests := []struct {
|
||||
requiredMatchers []*labels.Matcher
|
||||
matchers []*labels.Matcher
|
||||
seriesSet storage.SeriesSet
|
||||
}{
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchRegexp, "special", "label"),
|
||||
},
|
||||
seriesSet: storage.NoopSeriesSet(),
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "different"),
|
||||
},
|
||||
seriesSet: storage.NoopSeriesSet(),
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "baz"),
|
||||
},
|
||||
seriesSet: storage.NoopSeriesSet(),
|
||||
},
|
||||
{
|
||||
requiredMatchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
matchers: []*labels.Matcher{
|
||||
mustNewLabelMatcher(labels.MatchEqual, "special", "label"),
|
||||
mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"),
|
||||
},
|
||||
seriesSet: mockSeriesSet{},
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
q := &requiredMatchersQuerier{
|
||||
Querier: mockQuerier{},
|
||||
requiredMatchers: test.requiredMatchers,
|
||||
}
|
||||
|
||||
if want, have := test.seriesSet, q.Select(test.matchers...); want != have {
|
||||
t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
|
||||
}
|
||||
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) {
|
||||
t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,11 +14,14 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// Callback func that return the oldest timestamp stored in a storage.
|
||||
|
@ -34,9 +37,8 @@ type Storage struct {
|
|||
queues []*QueueManager
|
||||
|
||||
// For reads
|
||||
clients []*Client
|
||||
queryables []storage.Queryable
|
||||
localStartTimeCallback startTimeCallback
|
||||
externalLabels model.LabelSet
|
||||
}
|
||||
|
||||
// NewStorage returns a remote.Storage.
|
||||
|
@ -86,22 +88,28 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
|
||||
// Update read clients
|
||||
|
||||
clients := []*Client{}
|
||||
s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs))
|
||||
for i, rrConf := range conf.RemoteReadConfigs {
|
||||
c, err := NewClient(i, &ClientConfig{
|
||||
URL: rrConf.URL,
|
||||
Timeout: rrConf.RemoteTimeout,
|
||||
HTTPClientConfig: rrConf.HTTPClientConfig,
|
||||
ReadRecent: rrConf.ReadRecent,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clients = append(clients, c)
|
||||
}
|
||||
|
||||
s.clients = clients
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
var q storage.Queryable
|
||||
q = QueryableClient(c)
|
||||
q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels)
|
||||
if len(rrConf.RequiredMatchers) > 0 {
|
||||
q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers))
|
||||
}
|
||||
if !rrConf.ReadRecent {
|
||||
q = PreferLocalStorageFilter(q, s.localStartTimeCallback)
|
||||
}
|
||||
s.queryables = append(s.queryables, q)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -111,6 +119,24 @@ func (s *Storage) StartTime() (int64, error) {
|
|||
return int64(model.Latest), nil
|
||||
}
|
||||
|
||||
// Querier returns a storage.MergeQuerier combining the remote client queriers
|
||||
// of each configured remote read endpoint.
|
||||
func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
s.mtx.Lock()
|
||||
queryables := s.queryables
|
||||
s.mtx.Unlock()
|
||||
|
||||
queriers := make([]storage.Querier, 0, len(queryables))
|
||||
for _, queryable := range queryables {
|
||||
q, err := queryable.Querier(ctx, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queriers = append(queriers, q)
|
||||
}
|
||||
return storage.NewMergeQuerier(queriers), nil
|
||||
}
|
||||
|
||||
// Close the background processing of the storage queues.
|
||||
func (s *Storage) Close() error {
|
||||
s.mtx.Lock()
|
||||
|
@ -122,3 +148,15 @@ func (s *Storage) Close() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher {
|
||||
ms := make([]*labels.Matcher, 0, len(ls))
|
||||
for k, v := range ls {
|
||||
ms = append(ms, &labels.Matcher{
|
||||
Type: labels.MatchEqual,
|
||||
Name: string(k),
|
||||
Value: string(v),
|
||||
})
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue