diff --git a/config/config.go b/config/config.go index 8a295b8961..0e9eadb694 100644 --- a/config/config.go +++ b/config/config.go @@ -1496,6 +1496,10 @@ type RemoteReadConfig struct { // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig HTTPClientConfig `yaml:",inline"` + // RequiredMatchers is an optional list of equality matchers which have to + // be present in a selector to query the remote read endpoint. + RequiredMatchers model.LabelSet `yaml:"required_matchers,omitempty"` + // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } diff --git a/config/config_test.go b/config/config_test.go index 8ae364e28a..f99e85f353 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -82,9 +82,10 @@ var expectedConf = &Config{ ReadRecent: true, }, { - URL: mustParseURL("http://remote3/read"), - RemoteTimeout: model.Duration(1 * time.Minute), - ReadRecent: false, + URL: mustParseURL("http://remote3/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: false, + RequiredMatchers: model.LabelSet{"job": "special"}, }, }, diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index a65a401ac9..a282c1ec28 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -25,6 +25,8 @@ remote_read: read_recent: true - url: http://remote3/read read_recent: false + required_matchers: + job: special scrape_configs: - job_name: prometheus diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index bd102d1518..4380c1955b 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1134,6 +1134,11 @@ likely in future releases. # The URL of the endpoint to query from. url: +# An optional list of equality matchers which have to be +# present in a selector to query the remote read endpoint. +required_matchers: + [ : ... ] + # Timeout for requests to the remote read endpoint. [ remote_timeout: | default = 30s ] diff --git a/storage/interface.go b/storage/interface.go index 2b21792520..f9bfc6a27f 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -31,12 +31,11 @@ var ( // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { + Queryable + // StartTime returns the oldest timestamp stored in the storage. StartTime() (int64, error) - // Querier returns a new Querier on the storage. - Querier(ctx context.Context, mint, maxt int64) (Querier, error) - // Appender returns a new appender against the storage. Appender() (Appender, error) @@ -44,6 +43,12 @@ type Storage interface { Close() error } +// A Queryable handles queries against a storage. +type Queryable interface { + // Querier returns a new Querier on the storage. + Querier(ctx context.Context, mint, maxt int64) (Querier, error) +} + // Querier provides reading access to time series data. type Querier interface { // Select returns a set of series that matches the given label matchers. @@ -56,6 +61,15 @@ type Querier interface { Close() error } +// QueryableFunc is an adapter to allow the use of ordinary functions as +// Queryables. It follows the idea of http.HandlerFunc. +type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error) + +// Querier calls f() with the given parameters. +func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { + return f(ctx, mint, maxt) +} + // Appender provides batched appends against a storage. type Appender interface { Add(l labels.Labels, t int64, v float64) (uint64, error) diff --git a/storage/remote/client.go b/storage/remote/client.go index bba984bc8c..8da4d84522 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -37,18 +37,16 @@ const maxErrMsgLen = 256 // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - index int // Used to differentiate metrics. - url *config.URL - client *http.Client - timeout time.Duration - readRecent bool + index int // Used to differentiate clients in metrics. + url *config.URL + client *http.Client + timeout time.Duration } // ClientConfig configures a Client. type ClientConfig struct { URL *config.URL Timeout model.Duration - ReadRecent bool HTTPClientConfig config.HTTPClientConfig } @@ -60,11 +58,10 @@ func NewClient(index int, conf *ClientConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.URL, - client: httpClient, - timeout: time.Duration(conf.Timeout), - readRecent: conf.ReadRecent, + index: index, + url: conf.URL, + client: httpClient, + timeout: time.Duration(conf.Timeout), }, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 5b966acba3..0fa25f36c0 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -158,6 +158,12 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { } } +type byLabel []storage.Series + +func (a byLabel) Len() int { return len(a) } +func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + // errSeriesSet implements storage.SeriesSet, just returning an error. type errSeriesSet struct { err error diff --git a/storage/remote/read.go b/storage/remote/read.go index c466e7ddb0..be87c3f6fe 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -21,55 +21,30 @@ import ( "github.com/prometheus/prometheus/storage" ) -// Querier returns a new Querier on the storage. -func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - r.mtx.Lock() - defer r.mtx.Unlock() - - queriers := make([]storage.Querier, 0, len(r.clients)) - localStartTime, err := r.localStartTimeCallback() - if err != nil { - return nil, err - } - for _, c := range r.clients { - cmaxt := maxt - if !c.readRecent { - // Avoid queries whose timerange is later than the first timestamp in local DB. - if mint > localStartTime { - continue - } - // Query only samples older than the first timestamp in local DB. - if maxt > localStartTime { - cmaxt = localStartTime - } - } - queriers = append(queriers, &querier{ - ctx: ctx, - mint: mint, - maxt: cmaxt, - client: c, - externalLabels: r.externalLabels, - }) - } - return newMergeQueriers(queriers), nil +// QueryableClient returns a storage.Queryable which queries the given +// Client to select series sets. +func QueryableClient(c *Client) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &querier{ + ctx: ctx, + mint: mint, + maxt: maxt, + client: c, + }, nil + }) } -// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed. -var newMergeQueriers = storage.NewMergeQuerier - -// Querier is an adapter to make a Client usable as a storage.Querier. +// querier is an adapter to make a Client usable as a storage.Querier. type querier struct { - ctx context.Context - mint, maxt int64 - client *Client - externalLabels model.LabelSet + ctx context.Context + mint, maxt int64 + client *Client } -// Select returns a set of series that matches the given label matchers. +// Select implements storage.Querier and uses the given matchers to read series +// sets from the Client. func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { - m, added := q.addExternalLabels(matchers) - - query, err := ToQuery(q.mint, q.maxt, m) + query, err := ToQuery(q.mint, q.maxt, matchers) if err != nil { return errSeriesSet{err: err} } @@ -79,40 +54,124 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { return errSeriesSet{err: err} } - seriesSet := FromQueryResult(res) - - return newSeriesSetFilter(seriesSet, added) + return FromQueryResult(res) } -type byLabel []storage.Series - -func (a byLabel) Len() int { return len(a) } -func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } - -// LabelValues returns all potential values for a label name. +// LabelValues implements storage.Querier and is a noop. func (q *querier) LabelValues(name string) ([]string, error) { // TODO implement? return nil, nil } -// Close releases the resources of the Querier. +// Close implements storage.Querier and is a noop. func (q *querier) Close() error { return nil } +// ExternablLabelsHandler returns a storage.Queryable which creates a +// externalLabelsQuerier. +func ExternablLabelsHandler(next storage.Queryable, externalLabels model.LabelSet) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil + }) +} + +// externalLabelsQuerier is a querier which ensures that Select() results match +// the configured external labels. +type externalLabelsQuerier struct { + storage.Querier + + externalLabels model.LabelSet +} + +// Select adds equality matchers for all external labels to the list of matchers +// before calling the wrapped storage.Queryable. The added external labels are +// removed from the returned series sets. +func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + m, added := q.addExternalLabels(matchers) + s := q.Querier.Select(m...) + return newSeriesSetFilter(s, added) +} + +// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier +// if requested timeframe can be answered completely by the local TSDB, and +// reduces maxt if the timeframe can be partially answered by TSDB. +func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + localStartTime, err := cb() + if err != nil { + return nil, err + } + cmaxt := maxt + // Avoid queries whose timerange is later than the first timestamp in local DB. + if mint > localStartTime { + return storage.NoopQuerier(), nil + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + return next.Querier(ctx, mint, cmaxt) + }) +} + +// RequiredMatchersFilter returns a storage.Queryable which creates a +// requiredMatchersQuerier. +func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil + }) +} + +// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls +// to match the given labelSet. +type requiredMatchersQuerier struct { + storage.Querier + + requiredMatchers []*labels.Matcher +} + +// Select returns a NoopSeriesSet if the given matchers don't match the label +// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. +func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + ms := q.requiredMatchers + for _, m := range matchers { + for i, r := range ms { + if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { + ms = append(ms[:i], ms[i+1:]...) + break + } + } + if len(ms) == 0 { + break + } + } + if len(ms) > 0 { + return storage.NoopSeriesSet() + } + return q.Querier.Select(matchers...) +} + // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. // We return the new set of matchers, along with a map of labels for which // matchers were added, so that these can later be removed from the result // time series again. -func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { +func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { el := make(model.LabelSet, len(q.externalLabels)) for k, v := range q.externalLabels { el[k] = v } - for _, m := range matchers { + for _, m := range ms { if _, ok := el[model.LabelName(m.Name)]; ok { delete(el, model.LabelName(m.Name)) } @@ -122,9 +181,9 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match if err != nil { panic(err) } - matchers = append(matchers, m) + ms = append(ms, m) } - return matchers, el + return ms, el } func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 2fce331897..0bcdc45c76 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -18,10 +18,8 @@ import ( "reflect" "sort" "testing" - "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -35,7 +33,22 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher return m } -func TestAddExternalLabels(t *testing.T) { +func TestExternalLabelsQuerierSelect(t *testing.T) { + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + } + q := &externalLabelsQuerier{ + Querier: mockQuerier{}, + externalLabels: model.LabelSet{"region": "europe"}, + } + + want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) + if have := q.Select(matchers...); !reflect.DeepEqual(want, have) { + t.Errorf("expected series set %+v, got %+v", want, have) + } +} + +func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { tests := []struct { el model.LabelSet inMatchers []*labels.Matcher @@ -80,10 +93,7 @@ func TestAddExternalLabels(t *testing.T) { } for i, test := range tests { - q := querier{ - externalLabels: test.el, - } - + q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el} matchers, added := q.addExternalLabels(test.inMatchers) sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) @@ -133,58 +143,178 @@ func TestSeriesSetFilter(t *testing.T) { } } -type mockMergeQuerier struct{ queriersCount int } +type mockQuerier struct { + ctx context.Context + mint, maxt int64 -func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil } -func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil } -func (*mockMergeQuerier) Close() error { return nil } + storage.Querier +} + +type mockSeriesSet struct { + storage.SeriesSet +} + +func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet { + return mockSeriesSet{} +} + +func TestPreferLocalStorageFilter(t *testing.T) { + ctx := context.Background() -func TestRemoteStorageQuerier(t *testing.T) { tests := []struct { - localStartTime int64 - readRecentClients []bool - mint int64 - maxt int64 - expectedQueriersCount int + localStartTime int64 + mint int64 + maxt int64 + querier storage.Querier }{ { - localStartTime: int64(20), - readRecentClients: []bool{true, true, false}, - mint: int64(0), - maxt: int64(50), - expectedQueriersCount: 3, + localStartTime: int64(100), + mint: int64(0), + maxt: int64(50), + querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, }, { - localStartTime: int64(20), - readRecentClients: []bool{true, true, false}, - mint: int64(30), - maxt: int64(50), - expectedQueriersCount: 2, + localStartTime: int64(20), + mint: int64(0), + maxt: int64(50), + querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20}, + }, + { + localStartTime: int64(20), + mint: int64(30), + maxt: int64(50), + querier: storage.NoopQuerier(), }, } for i, test := range tests { - s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) - s.clients = []*Client{} - for _, readRecent := range test.readRecentClients { - c, _ := NewClient(0, &ClientConfig{ - URL: nil, - Timeout: model.Duration(30 * time.Second), - HTTPClientConfig: config.HTTPClientConfig{}, - ReadRecent: readRecent, - }) - s.clients = append(s.clients, c) - } - // overrides mergeQuerier to mockMergeQuerier so we can reflect its type - newMergeQueriers = func(queriers []storage.Querier) storage.Querier { - return &mockMergeQuerier{queriersCount: len(queriers)} + f := PreferLocalStorageFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + func() (int64, error) { return test.localStartTime, nil }, + ) + + q, err := f.Querier(ctx, test.mint, test.maxt) + if err != nil { + t.Fatal(err) } - querier, _ := s.Querier(context.Background(), test.mint, test.maxt) - actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount - - if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) { - t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount) + if test.querier != q { + t.Errorf("%d. expected quierer %+v, got %+v", i, test.querier, q) + } + } +} + +func TestRequiredMatchersFilter(t *testing.T) { + ctx := context.Background() + + f := RequiredMatchersFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")}, + ) + + want := &requiredMatchersQuerier{ + Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, + requiredMatchers: []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")}, + } + have, err := f.Querier(ctx, 0, 50) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(want, have) { + t.Errorf("expected quierer %+v, got %+v", want, have) + } +} + +func TestRequiredLabelsQuerierSelect(t *testing.T) { + tests := []struct { + requiredMatchers []*labels.Matcher + matchers []*labels.Matcher + seriesSet storage.SeriesSet + }{ + { + requiredMatchers: []*labels.Matcher{}, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchRegexp, "special", "label"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "different"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "baz"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + seriesSet: mockSeriesSet{}, + }, + } + + for i, test := range tests { + q := &requiredMatchersQuerier{ + Querier: mockQuerier{}, + requiredMatchers: test.requiredMatchers, + } + + if want, have := test.seriesSet, q.Select(test.matchers...); want != have { + t.Errorf("%d. expected series set %+v, got %+v", i, want, have) + } + if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { + t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i) } } } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index bb8c173720..7fcb60485f 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -14,11 +14,14 @@ package remote import ( + "context" "sync" "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" ) // Callback func that return the oldest timestamp stored in a storage. @@ -34,9 +37,8 @@ type Storage struct { queues []*QueueManager // For reads - clients []*Client + queryables []storage.Queryable localStartTimeCallback startTimeCallback - externalLabels model.LabelSet } // NewStorage returns a remote.Storage. @@ -86,22 +88,28 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { // Update read clients - clients := []*Client{} + s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) for i, rrConf := range conf.RemoteReadConfigs { c, err := NewClient(i, &ClientConfig{ URL: rrConf.URL, Timeout: rrConf.RemoteTimeout, HTTPClientConfig: rrConf.HTTPClientConfig, - ReadRecent: rrConf.ReadRecent, }) if err != nil { return err } - clients = append(clients, c) - } - s.clients = clients - s.externalLabels = conf.GlobalConfig.ExternalLabels + var q storage.Queryable + q = QueryableClient(c) + q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels) + if len(rrConf.RequiredMatchers) > 0 { + q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) + } + if !rrConf.ReadRecent { + q = PreferLocalStorageFilter(q, s.localStartTimeCallback) + } + s.queryables = append(s.queryables, q) + } return nil } @@ -111,6 +119,24 @@ func (s *Storage) StartTime() (int64, error) { return int64(model.Latest), nil } +// Querier returns a storage.MergeQuerier combining the remote client queriers +// of each configured remote read endpoint. +func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + s.mtx.Lock() + queryables := s.queryables + s.mtx.Unlock() + + queriers := make([]storage.Querier, 0, len(queryables)) + for _, queryable := range queryables { + q, err := queryable.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + queriers = append(queriers, q) + } + return storage.NewMergeQuerier(queriers), nil +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() @@ -122,3 +148,15 @@ func (s *Storage) Close() error { return nil } + +func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher { + ms := make([]*labels.Matcher, 0, len(ls)) + for k, v := range ls { + ms = append(ms, &labels.Matcher{ + Type: labels.MatchEqual, + Name: string(k), + Value: string(v), + }) + } + return ms +}