From 9b0091d487cfadc7f6b967178a31193006cc1dbb Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Sun, 12 Nov 2017 01:27:45 +0100 Subject: [PATCH 1/3] Add storage.Queryable and storage.QueryableFunc In order to compose different querier implementations more easily, this change introduces a separate storage.Queryable interface grouping the query (Querier) function of the storage. Furthermore, it adds a QueryableFunc type to ease writing very simple queryable implementations. --- storage/interface.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index 2b21792520..f9bfc6a27f 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -31,12 +31,11 @@ var ( // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { + Queryable + // StartTime returns the oldest timestamp stored in the storage. StartTime() (int64, error) - // Querier returns a new Querier on the storage. - Querier(ctx context.Context, mint, maxt int64) (Querier, error) - // Appender returns a new appender against the storage. Appender() (Appender, error) @@ -44,6 +43,12 @@ type Storage interface { Close() error } +// A Queryable handles queries against a storage. +type Queryable interface { + // Querier returns a new Querier on the storage. + Querier(ctx context.Context, mint, maxt int64) (Querier, error) +} + // Querier provides reading access to time series data. type Querier interface { // Select returns a set of series that matches the given label matchers. @@ -56,6 +61,15 @@ type Querier interface { Close() error } +// QueryableFunc is an adapter to allow the use of ordinary functions as +// Queryables. It follows the idea of http.HandlerFunc. +type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error) + +// Querier calls f() with the given parameters. +func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { + return f(ctx, mint, maxt) +} + // Appender provides batched appends against a storage. type Appender interface { Add(l labels.Labels, t int64, v float64) (uint64, error) From 434f0374f72d8ba77a75429f985c51f0cc650c16 Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Sun, 12 Nov 2017 02:15:27 +0100 Subject: [PATCH 2/3] Refactor remote storage querier handling * Decouple remote client from ReadRecent feature. * Separate remote read filter into a small, testable function. * Use storage.Queryable interface to compose independent functionalities. --- storage/remote/client.go | 19 +++-- storage/remote/codec.go | 6 ++ storage/remote/read.go | 134 ++++++++++++++++++++---------------- storage/remote/read_test.go | 101 ++++++++++++++------------- storage/remote/storage.go | 38 +++++++--- 5 files changed, 175 insertions(+), 123 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index bba984bc8c..8da4d84522 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -37,18 +37,16 @@ const maxErrMsgLen = 256 // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - index int // Used to differentiate metrics. - url *config.URL - client *http.Client - timeout time.Duration - readRecent bool + index int // Used to differentiate clients in metrics. + url *config.URL + client *http.Client + timeout time.Duration } // ClientConfig configures a Client. type ClientConfig struct { URL *config.URL Timeout model.Duration - ReadRecent bool HTTPClientConfig config.HTTPClientConfig } @@ -60,11 +58,10 @@ func NewClient(index int, conf *ClientConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.URL, - client: httpClient, - timeout: time.Duration(conf.Timeout), - readRecent: conf.ReadRecent, + index: index, + url: conf.URL, + client: httpClient, + timeout: time.Duration(conf.Timeout), }, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 5b966acba3..0fa25f36c0 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -158,6 +158,12 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { } } +type byLabel []storage.Series + +func (a byLabel) Len() int { return len(a) } +func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + // errSeriesSet implements storage.SeriesSet, just returning an error. type errSeriesSet struct { err error diff --git a/storage/remote/read.go b/storage/remote/read.go index c466e7ddb0..c3753c3780 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -21,55 +21,30 @@ import ( "github.com/prometheus/prometheus/storage" ) -// Querier returns a new Querier on the storage. -func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - r.mtx.Lock() - defer r.mtx.Unlock() - - queriers := make([]storage.Querier, 0, len(r.clients)) - localStartTime, err := r.localStartTimeCallback() - if err != nil { - return nil, err - } - for _, c := range r.clients { - cmaxt := maxt - if !c.readRecent { - // Avoid queries whose timerange is later than the first timestamp in local DB. - if mint > localStartTime { - continue - } - // Query only samples older than the first timestamp in local DB. - if maxt > localStartTime { - cmaxt = localStartTime - } - } - queriers = append(queriers, &querier{ - ctx: ctx, - mint: mint, - maxt: cmaxt, - client: c, - externalLabels: r.externalLabels, - }) - } - return newMergeQueriers(queriers), nil +// QueryableClient returns a storage.Queryable which queries the given +// Client to select series sets. +func QueryableClient(c *Client) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &querier{ + ctx: ctx, + mint: mint, + maxt: maxt, + client: c, + }, nil + }) } -// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed. -var newMergeQueriers = storage.NewMergeQuerier - -// Querier is an adapter to make a Client usable as a storage.Querier. +// querier is an adapter to make a Client usable as a storage.Querier. type querier struct { - ctx context.Context - mint, maxt int64 - client *Client - externalLabels model.LabelSet + ctx context.Context + mint, maxt int64 + client *Client } -// Select returns a set of series that matches the given label matchers. +// Select implements storage.Querier and uses the given matchers to read series +// sets from the Client. func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { - m, added := q.addExternalLabels(matchers) - - query, err := ToQuery(q.mint, q.maxt, m) + query, err := ToQuery(q.mint, q.maxt, matchers) if err != nil { return errSeriesSet{err: err} } @@ -79,40 +54,83 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { return errSeriesSet{err: err} } - seriesSet := FromQueryResult(res) - - return newSeriesSetFilter(seriesSet, added) + return FromQueryResult(res) } -type byLabel []storage.Series - -func (a byLabel) Len() int { return len(a) } -func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } - -// LabelValues returns all potential values for a label name. +// LabelValues implements storage.Querier and is a noop. func (q *querier) LabelValues(name string) ([]string, error) { // TODO implement? return nil, nil } -// Close releases the resources of the Querier. +// Close implements storage.Querier and is a noop. func (q *querier) Close() error { return nil } +// ExternablLabelsHandler returns a storage.Queryable which creates a +// externalLabelsQuerier. +func ExternablLabelsHandler(next storage.Queryable, externalLabels model.LabelSet) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil + }) +} + +// externalLabelsQuerier is a querier which ensures that Select() results match +// the configured external labels. +type externalLabelsQuerier struct { + storage.Querier + + externalLabels model.LabelSet +} + +// Select adds equality matchers for all external labels to the list of matchers +// before calling the wrapped storage.Queryable. The added external labels are +// removed from the returned series sets. +func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + m, added := q.addExternalLabels(matchers) + s := q.Querier.Select(m...) + return newSeriesSetFilter(s, added) +} + +// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier +// if requested timeframe can be answered completely by the local TSDB, and +// reduces maxt if the timeframe can be partially answered by TSDB. +func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + localStartTime, err := cb() + if err != nil { + return nil, err + } + cmaxt := maxt + // Avoid queries whose timerange is later than the first timestamp in local DB. + if mint > localStartTime { + return storage.NoopQuerier(), nil + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + return next.Querier(ctx, mint, cmaxt) + }) +} + // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. // We return the new set of matchers, along with a map of labels for which // matchers were added, so that these can later be removed from the result // time series again. -func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { +func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { el := make(model.LabelSet, len(q.externalLabels)) for k, v := range q.externalLabels { el[k] = v } - for _, m := range matchers { + for _, m := range ms { if _, ok := el[model.LabelName(m.Name)]; ok { delete(el, model.LabelName(m.Name)) } @@ -122,9 +140,9 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match if err != nil { panic(err) } - matchers = append(matchers, m) + ms = append(ms, m) } - return matchers, el + return ms, el } func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 2fce331897..e8ba06f23f 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -18,10 +18,8 @@ import ( "reflect" "sort" "testing" - "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -35,7 +33,22 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher return m } -func TestAddExternalLabels(t *testing.T) { +func TestExternalLabelsQuerierSelect(t *testing.T) { + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + } + q := &externalLabelsQuerier{ + Querier: mockQuerier{}, + externalLabels: model.LabelSet{"region": "europe"}, + } + + want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) + if have := q.Select(matchers...); !reflect.DeepEqual(want, have) { + t.Errorf("expected series set %+v, got %+v", want, have) + } +} + +func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { tests := []struct { el model.LabelSet inMatchers []*labels.Matcher @@ -80,10 +93,7 @@ func TestAddExternalLabels(t *testing.T) { } for i, test := range tests { - q := querier{ - externalLabels: test.el, - } - + q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el} matchers, added := q.addExternalLabels(test.inMatchers) sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) @@ -133,58 +143,57 @@ func TestSeriesSetFilter(t *testing.T) { } } -type mockMergeQuerier struct{ queriersCount int } +type testQuerier struct { + ctx context.Context + mint, maxt int64 -func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil } -func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil } -func (*mockMergeQuerier) Close() error { return nil } + storage.Querier +} + +func TestPreferLocalStorageFilter(t *testing.T) { + ctx := context.Background() -func TestRemoteStorageQuerier(t *testing.T) { tests := []struct { - localStartTime int64 - readRecentClients []bool - mint int64 - maxt int64 - expectedQueriersCount int + localStartTime int64 + mint int64 + maxt int64 + querier storage.Querier }{ { - localStartTime: int64(20), - readRecentClients: []bool{true, true, false}, - mint: int64(0), - maxt: int64(50), - expectedQueriersCount: 3, + localStartTime: int64(100), + mint: int64(0), + maxt: int64(50), + querier: testQuerier{ctx: ctx, mint: 0, maxt: 50}, }, { - localStartTime: int64(20), - readRecentClients: []bool{true, true, false}, - mint: int64(30), - maxt: int64(50), - expectedQueriersCount: 2, + localStartTime: int64(20), + mint: int64(0), + maxt: int64(50), + querier: testQuerier{ctx: ctx, mint: 0, maxt: 20}, + }, + { + localStartTime: int64(20), + mint: int64(30), + maxt: int64(50), + querier: storage.NoopQuerier(), }, } for i, test := range tests { - s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) - s.clients = []*Client{} - for _, readRecent := range test.readRecentClients { - c, _ := NewClient(0, &ClientConfig{ - URL: nil, - Timeout: model.Duration(30 * time.Second), - HTTPClientConfig: config.HTTPClientConfig{}, - ReadRecent: readRecent, - }) - s.clients = append(s.clients, c) - } - // overrides mergeQuerier to mockMergeQuerier so we can reflect its type - newMergeQueriers = func(queriers []storage.Querier) storage.Querier { - return &mockMergeQuerier{queriersCount: len(queriers)} + f := PreferLocalStorageFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return testQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + func() (int64, error) { return test.localStartTime, nil }, + ) + + q, err := f.Querier(ctx, test.mint, test.maxt) + if err != nil { + t.Fatal(err) } - querier, _ := s.Querier(context.Background(), test.mint, test.maxt) - actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount - - if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) { - t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount) + if test.querier != q { + t.Errorf("%d. expected quierer %+v, got %+v", i, test.querier, q) } } } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index bb8c173720..b07c346388 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -14,11 +14,13 @@ package remote import ( + "context" "sync" "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" ) // Callback func that return the oldest timestamp stored in a storage. @@ -34,9 +36,8 @@ type Storage struct { queues []*QueueManager // For reads - clients []*Client + queryables []storage.Queryable localStartTimeCallback startTimeCallback - externalLabels model.LabelSet } // NewStorage returns a remote.Storage. @@ -86,22 +87,25 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { // Update read clients - clients := []*Client{} + s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) for i, rrConf := range conf.RemoteReadConfigs { c, err := NewClient(i, &ClientConfig{ URL: rrConf.URL, Timeout: rrConf.RemoteTimeout, HTTPClientConfig: rrConf.HTTPClientConfig, - ReadRecent: rrConf.ReadRecent, }) if err != nil { return err } - clients = append(clients, c) - } - s.clients = clients - s.externalLabels = conf.GlobalConfig.ExternalLabels + var q storage.Queryable + q = QueryableClient(c) + q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels) + if !rrConf.ReadRecent { + q = PreferLocalStorageFilter(q, s.localStartTimeCallback) + } + s.queryables = append(s.queryables, q) + } return nil } @@ -111,6 +115,24 @@ func (s *Storage) StartTime() (int64, error) { return int64(model.Latest), nil } +// Querier returns a storage.MergeQuerier combining the remote client queriers +// of each configured remote read endpoint. +func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + s.mtx.Lock() + queryables := s.queryables + s.mtx.Unlock() + + queriers := make([]storage.Querier, 0, len(queryables)) + for _, queryable := range queryables { + q, err := queryable.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + queriers = append(queriers, q) + } + return storage.NewMergeQuerier(queriers), nil +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() From 7098c5647447a4510b76efd5c3334fe806d7789c Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Sun, 12 Nov 2017 02:23:20 +0100 Subject: [PATCH 3/3] Add remote read filter option For special remote read endpoints which have only data for specific queries, it is desired to limit the number of queries sent to the configured remote read endpoint to reduce latency and performance overhead. --- config/config.go | 4 + config/config_test.go | 7 +- config/testdata/conf.good.yml | 2 + docs/configuration/configuration.md | 5 ++ storage/remote/read.go | 41 +++++++++ storage/remote/read_test.go | 129 +++++++++++++++++++++++++++- storage/remote/storage.go | 16 ++++ 7 files changed, 197 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 8a295b8961..0e9eadb694 100644 --- a/config/config.go +++ b/config/config.go @@ -1496,6 +1496,10 @@ type RemoteReadConfig struct { // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig HTTPClientConfig `yaml:",inline"` + // RequiredMatchers is an optional list of equality matchers which have to + // be present in a selector to query the remote read endpoint. + RequiredMatchers model.LabelSet `yaml:"required_matchers,omitempty"` + // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } diff --git a/config/config_test.go b/config/config_test.go index 8ae364e28a..f99e85f353 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -82,9 +82,10 @@ var expectedConf = &Config{ ReadRecent: true, }, { - URL: mustParseURL("http://remote3/read"), - RemoteTimeout: model.Duration(1 * time.Minute), - ReadRecent: false, + URL: mustParseURL("http://remote3/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: false, + RequiredMatchers: model.LabelSet{"job": "special"}, }, }, diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index a65a401ac9..a282c1ec28 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -25,6 +25,8 @@ remote_read: read_recent: true - url: http://remote3/read read_recent: false + required_matchers: + job: special scrape_configs: - job_name: prometheus diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index bd102d1518..4380c1955b 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1134,6 +1134,11 @@ likely in future releases. # The URL of the endpoint to query from. url: +# An optional list of equality matchers which have to be +# present in a selector to query the remote read endpoint. +required_matchers: + [ : ... ] + # Timeout for requests to the remote read endpoint. [ remote_timeout: | default = 30s ] diff --git a/storage/remote/read.go b/storage/remote/read.go index c3753c3780..be87c3f6fe 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -119,6 +119,47 @@ func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) stor }) } +// RequiredMatchersFilter returns a storage.Queryable which creates a +// requiredMatchersQuerier. +func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil + }) +} + +// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls +// to match the given labelSet. +type requiredMatchersQuerier struct { + storage.Querier + + requiredMatchers []*labels.Matcher +} + +// Select returns a NoopSeriesSet if the given matchers don't match the label +// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. +func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + ms := q.requiredMatchers + for _, m := range matchers { + for i, r := range ms { + if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { + ms = append(ms[:i], ms[i+1:]...) + break + } + } + if len(ms) == 0 { + break + } + } + if len(ms) > 0 { + return storage.NoopSeriesSet() + } + return q.Querier.Select(matchers...) +} + // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index e8ba06f23f..0bcdc45c76 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -143,13 +143,21 @@ func TestSeriesSetFilter(t *testing.T) { } } -type testQuerier struct { +type mockQuerier struct { ctx context.Context mint, maxt int64 storage.Querier } +type mockSeriesSet struct { + storage.SeriesSet +} + +func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet { + return mockSeriesSet{} +} + func TestPreferLocalStorageFilter(t *testing.T) { ctx := context.Background() @@ -163,13 +171,13 @@ func TestPreferLocalStorageFilter(t *testing.T) { localStartTime: int64(100), mint: int64(0), maxt: int64(50), - querier: testQuerier{ctx: ctx, mint: 0, maxt: 50}, + querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, }, { localStartTime: int64(20), mint: int64(0), maxt: int64(50), - querier: testQuerier{ctx: ctx, mint: 0, maxt: 20}, + querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20}, }, { localStartTime: int64(20), @@ -182,7 +190,7 @@ func TestPreferLocalStorageFilter(t *testing.T) { for i, test := range tests { f := PreferLocalStorageFilter( storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return testQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil }), func() (int64, error) { return test.localStartTime, nil }, ) @@ -197,3 +205,116 @@ func TestPreferLocalStorageFilter(t *testing.T) { } } } + +func TestRequiredMatchersFilter(t *testing.T) { + ctx := context.Background() + + f := RequiredMatchersFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")}, + ) + + want := &requiredMatchersQuerier{ + Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, + requiredMatchers: []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")}, + } + have, err := f.Querier(ctx, 0, 50) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(want, have) { + t.Errorf("expected quierer %+v, got %+v", want, have) + } +} + +func TestRequiredLabelsQuerierSelect(t *testing.T) { + tests := []struct { + requiredMatchers []*labels.Matcher + matchers []*labels.Matcher + seriesSet storage.SeriesSet + }{ + { + requiredMatchers: []*labels.Matcher{}, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchRegexp, "special", "label"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "different"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "baz"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + seriesSet: mockSeriesSet{}, + }, + } + + for i, test := range tests { + q := &requiredMatchersQuerier{ + Querier: mockQuerier{}, + requiredMatchers: test.requiredMatchers, + } + + if want, have := test.seriesSet, q.Select(test.matchers...); want != have { + t.Errorf("%d. expected series set %+v, got %+v", i, want, have) + } + if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { + t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i) + } + } +} diff --git a/storage/remote/storage.go b/storage/remote/storage.go index b07c346388..7fcb60485f 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -101,6 +102,9 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { var q storage.Queryable q = QueryableClient(c) q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels) + if len(rrConf.RequiredMatchers) > 0 { + q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) + } if !rrConf.ReadRecent { q = PreferLocalStorageFilter(q, s.localStartTimeCallback) } @@ -144,3 +148,15 @@ func (s *Storage) Close() error { return nil } + +func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher { + ms := make([]*labels.Matcher, 0, len(ls)) + for k, v := range ls { + ms = append(ms, &labels.Matcher{ + Type: labels.MatchEqual, + Name: string(k), + Value: string(v), + }) + } + return ms +}