mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-20 07:47:31 -07:00
*: adapt to storage interface changes
This commit is contained in:
parent
10b2e8c637
commit
83cd270ea4
|
@ -517,7 +517,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
|
||||||
Inspect(s.Expr, func(node Node) bool {
|
Inspect(s.Expr, func(node Node) bool {
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case *VectorSelector:
|
case *VectorSelector:
|
||||||
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
|
set, err := querier.Select(n.LabelMatchers...)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
n.series, err = expandSeriesSet(set)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(fabxc): use multi-error.
|
// TODO(fabxc): use multi-error.
|
||||||
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
||||||
|
@ -529,7 +534,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
|
||||||
}
|
}
|
||||||
|
|
||||||
case *MatrixSelector:
|
case *MatrixSelector:
|
||||||
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
|
set, err := querier.Select(n.LabelMatchers...)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
n.series, err = expandSeriesSet(set)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -172,9 +172,16 @@ func TestStaleness(t *testing.T) {
|
||||||
querier, err := storage.Querier(context.Background(), 0, 2000)
|
querier, err := storage.Querier(context.Background(), 0, 2000)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer querier.Close()
|
defer querier.Close()
|
||||||
matcher, _ := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
|
||||||
samples, err := readSeriesSet(querier.Select(matcher))
|
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
set, err := querier.Select(matcher)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
samples, err := readSeriesSet(set)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String()
|
metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String()
|
||||||
metricSample, ok := samples[metric]
|
metricSample, ok := samples[metric]
|
||||||
|
|
||||||
|
|
|
@ -216,12 +216,16 @@ func NewMergeQuerier(queriers []Querier) Querier {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select returns a set of series that matches the given label matchers.
|
// Select returns a set of series that matches the given label matchers.
|
||||||
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet {
|
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) {
|
||||||
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
||||||
for _, querier := range q.queriers {
|
for _, querier := range q.queriers {
|
||||||
seriesSets = append(seriesSets, querier.Select(matchers...))
|
set, err := querier.Select(matchers...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
seriesSets = append(seriesSets, set)
|
||||||
}
|
}
|
||||||
return newMergeSeriesSet(seriesSets)
|
return newMergeSeriesSet(seriesSets), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValues returns all potential values for a label name.
|
// LabelValues returns all potential values for a label name.
|
||||||
|
|
|
@ -52,7 +52,7 @@ type Queryable interface {
|
||||||
// Querier provides reading access to time series data.
|
// Querier provides reading access to time series data.
|
||||||
type Querier interface {
|
type Querier interface {
|
||||||
// Select returns a set of series that matches the given label matchers.
|
// Select returns a set of series that matches the given label matchers.
|
||||||
Select(...*labels.Matcher) SeriesSet
|
Select(...*labels.Matcher) (SeriesSet, error)
|
||||||
|
|
||||||
// LabelValues returns all potential values for a label name.
|
// LabelValues returns all potential values for a label name.
|
||||||
LabelValues(name string) ([]string, error)
|
LabelValues(name string) ([]string, error)
|
||||||
|
|
|
@ -22,8 +22,8 @@ func NoopQuerier() Querier {
|
||||||
return noopQuerier{}
|
return noopQuerier{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noopQuerier) Select(...*labels.Matcher) SeriesSet {
|
func (noopQuerier) Select(...*labels.Matcher) (SeriesSet, error) {
|
||||||
return NoopSeriesSet()
|
return NoopSeriesSet(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noopQuerier) LabelValues(name string) ([]string, error) {
|
func (noopQuerier) LabelValues(name string) ([]string, error) {
|
||||||
|
|
|
@ -43,18 +43,18 @@ type querier struct {
|
||||||
|
|
||||||
// Select implements storage.Querier and uses the given matchers to read series
|
// Select implements storage.Querier and uses the given matchers to read series
|
||||||
// sets from the Client.
|
// sets from the Client.
|
||||||
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||||
query, err := ToQuery(q.mint, q.maxt, matchers)
|
query, err := ToQuery(q.mint, q.maxt, matchers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errSeriesSet{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := q.client.Read(q.ctx, query)
|
res, err := q.client.Read(q.ctx, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errSeriesSet{err: err}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return FromQueryResult(res)
|
return FromQueryResult(res), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValues implements storage.Querier and is a noop.
|
// LabelValues implements storage.Querier and is a noop.
|
||||||
|
@ -91,10 +91,13 @@ type externalLabelsQuerier struct {
|
||||||
// Select adds equality matchers for all external labels to the list of matchers
|
// Select adds equality matchers for all external labels to the list of matchers
|
||||||
// before calling the wrapped storage.Queryable. The added external labels are
|
// before calling the wrapped storage.Queryable. The added external labels are
|
||||||
// removed from the returned series sets.
|
// removed from the returned series sets.
|
||||||
func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||||
m, added := q.addExternalLabels(matchers)
|
m, added := q.addExternalLabels(matchers)
|
||||||
s := q.Querier.Select(m...)
|
s, err := q.Querier.Select(m...)
|
||||||
return newSeriesSetFilter(s, added)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newSeriesSetFilter(s, added), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
|
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
|
||||||
|
@ -141,7 +144,7 @@ type requiredMatchersQuerier struct {
|
||||||
|
|
||||||
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
||||||
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
||||||
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||||
ms := q.requiredMatchers
|
ms := q.requiredMatchers
|
||||||
for _, m := range matchers {
|
for _, m := range matchers {
|
||||||
for i, r := range ms {
|
for i, r := range ms {
|
||||||
|
@ -155,7 +158,7 @@ func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.Ser
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(ms) > 0 {
|
if len(ms) > 0 {
|
||||||
return storage.NoopSeriesSet()
|
return storage.NoopSeriesSet(), nil
|
||||||
}
|
}
|
||||||
return q.Querier.Select(matchers...)
|
return q.Querier.Select(matchers...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,9 +41,12 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
|
||||||
Querier: mockQuerier{},
|
Querier: mockQuerier{},
|
||||||
externalLabels: model.LabelSet{"region": "europe"},
|
externalLabels: model.LabelSet{"region": "europe"},
|
||||||
}
|
}
|
||||||
|
|
||||||
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
|
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
|
||||||
if have := q.Select(matchers...); !reflect.DeepEqual(want, have) {
|
have, err := q.Select(matchers...)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(want, have) {
|
||||||
t.Errorf("expected series set %+v, got %+v", want, have)
|
t.Errorf("expected series set %+v, got %+v", want, have)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -154,8 +157,8 @@ type mockSeriesSet struct {
|
||||||
storage.SeriesSet
|
storage.SeriesSet
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet {
|
func (mockQuerier) Select(...*labels.Matcher) (storage.SeriesSet, error) {
|
||||||
return mockSeriesSet{}
|
return mockSeriesSet{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPreferLocalStorageFilter(t *testing.T) {
|
func TestPreferLocalStorageFilter(t *testing.T) {
|
||||||
|
@ -310,7 +313,11 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
|
||||||
requiredMatchers: test.requiredMatchers,
|
requiredMatchers: test.requiredMatchers,
|
||||||
}
|
}
|
||||||
|
|
||||||
if want, have := test.seriesSet, q.Select(test.matchers...); want != have {
|
have, err := q.Select(test.matchers...)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if want := test.seriesSet; want != have {
|
||||||
t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
|
t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
|
||||||
}
|
}
|
||||||
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) {
|
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) {
|
||||||
|
|
|
@ -188,14 +188,17 @@ type querier struct {
|
||||||
q tsdb.Querier
|
q tsdb.Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q querier) Select(oms ...*labels.Matcher) storage.SeriesSet {
|
func (q querier) Select(oms ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||||
ms := make([]tsdbLabels.Matcher, 0, len(oms))
|
ms := make([]tsdbLabels.Matcher, 0, len(oms))
|
||||||
|
|
||||||
for _, om := range oms {
|
for _, om := range oms {
|
||||||
ms = append(ms, convertMatcher(om))
|
ms = append(ms, convertMatcher(om))
|
||||||
}
|
}
|
||||||
|
set, err := q.q.Select(ms...)
|
||||||
return seriesSet{set: q.q.Select(ms...)}
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return seriesSet{set: set}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }
|
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }
|
||||||
|
|
|
@ -380,7 +380,11 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
|
||||||
var set storage.SeriesSet
|
var set storage.SeriesSet
|
||||||
|
|
||||||
for _, mset := range matcherSets {
|
for _, mset := range matcherSets {
|
||||||
set = storage.DeduplicateSeriesSet(set, q.Select(mset...))
|
s, err := q.Select(mset...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &apiError{errorExec, err}
|
||||||
|
}
|
||||||
|
set = storage.DeduplicateSeriesSet(set, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics := []labels.Labels{}
|
metrics := []labels.Labels{}
|
||||||
|
@ -517,7 +521,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...))
|
set, err := querier.Select(filteredMatchers...)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp.Results[i], err = remote.ToQueryResult(set)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|
|
@ -75,7 +75,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
var set storage.SeriesSet
|
var set storage.SeriesSet
|
||||||
|
|
||||||
for _, mset := range matcherSets {
|
for _, mset := range matcherSets {
|
||||||
set = storage.DeduplicateSeriesSet(set, q.Select(mset...))
|
s, err := q.Select(mset...)
|
||||||
|
if err != nil {
|
||||||
|
federationErrors.Inc()
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
set = storage.DeduplicateSeriesSet(set, s)
|
||||||
}
|
}
|
||||||
if set == nil {
|
if set == nil {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue