From 83cd270ea4702349642e7861582853cd6afc4d37 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 23 Nov 2017 13:50:06 +0100 Subject: [PATCH] *: adapt to storage interface changes --- promql/engine.go | 14 ++++++++++++-- rules/manager_test.go | 11 +++++++++-- storage/fanout.go | 10 +++++++--- storage/interface.go | 2 +- storage/noop.go | 4 ++-- storage/remote/read.go | 21 ++++++++++++--------- storage/remote/read_test.go | 17 ++++++++++++----- storage/tsdb/tsdb.go | 9 ++++++--- web/api/v1/api.go | 13 +++++++++++-- web/federate.go | 8 +++++++- 10 files changed, 79 insertions(+), 30 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 4f37ced26..29e3869ad 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -517,7 +517,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) + set, err := querier.Select(n.LabelMatchers...) + if err != nil { + level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) + return false + } + n.series, err = expandSeriesSet(set) if err != nil { // TODO(fabxc): use multi-error. level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) @@ -529,7 +534,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q } case *MatrixSelector: - n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) + set, err := querier.Select(n.LabelMatchers...) + if err != nil { + level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) + return false + } + n.series, err = expandSeriesSet(set) if err != nil { level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return false diff --git a/rules/manager_test.go b/rules/manager_test.go index 4f68a9f1b..5ff947129 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -172,9 +172,16 @@ func TestStaleness(t *testing.T) { querier, err := storage.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() - matcher, _ := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") - samples, err := readSeriesSet(querier.Select(matcher)) + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) + + set, err := querier.Select(matcher) + testutil.Ok(t, err) + + samples, err := readSeriesSet(set) + testutil.Ok(t, err) + metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String() metricSample, ok := samples[metric] diff --git a/storage/fanout.go b/storage/fanout.go index 62f973044..061d993af 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -216,12 +216,16 @@ func NewMergeQuerier(queriers []Querier) Querier { } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet { +func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) for _, querier := range q.queriers { - seriesSets = append(seriesSets, querier.Select(matchers...)) + set, err := querier.Select(matchers...) + if err != nil { + return nil, err + } + seriesSets = append(seriesSets, set) } - return newMergeSeriesSet(seriesSets) + return newMergeSeriesSet(seriesSets), nil } // LabelValues returns all potential values for a label name. diff --git a/storage/interface.go b/storage/interface.go index f9bfc6a27..71261b2c9 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,7 +52,7 @@ type Queryable interface { // Querier provides reading access to time series data. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...*labels.Matcher) SeriesSet + Select(...*labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, error) diff --git a/storage/noop.go b/storage/noop.go index 358cf2611..a5ff1bc9b 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -22,8 +22,8 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(...*labels.Matcher) SeriesSet { - return NoopSeriesSet() +func (noopQuerier) Select(...*labels.Matcher) (SeriesSet, error) { + return NoopSeriesSet(), nil } func (noopQuerier) LabelValues(name string) ([]string, error) { diff --git a/storage/remote/read.go b/storage/remote/read.go index be87c3f6f..e49d3524a 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -43,18 +43,18 @@ type querier struct { // Select implements storage.Querier and uses the given matchers to read series // sets from the Client. -func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { +func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { query, err := ToQuery(q.mint, q.maxt, matchers) if err != nil { - return errSeriesSet{err: err} + return nil, err } res, err := q.client.Read(q.ctx, query) if err != nil { - return errSeriesSet{err: err} + return nil, err } - return FromQueryResult(res) + return FromQueryResult(res), nil } // LabelValues implements storage.Querier and is a noop. @@ -91,10 +91,13 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { +func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { m, added := q.addExternalLabels(matchers) - s := q.Querier.Select(m...) - return newSeriesSetFilter(s, added) + s, err := q.Querier.Select(m...) + if err != nil { + return nil, err + } + return newSeriesSetFilter(s, added), nil } // PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier @@ -141,7 +144,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { +func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { @@ -155,7 +158,7 @@ func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.Ser } } if len(ms) > 0 { - return storage.NoopSeriesSet() + return storage.NoopSeriesSet(), nil } return q.Querier.Select(matchers...) } diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 0bcdc45c7..f61ab3742 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -41,9 +41,12 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { Querier: mockQuerier{}, externalLabels: model.LabelSet{"region": "europe"}, } - want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - if have := q.Select(matchers...); !reflect.DeepEqual(want, have) { + have, err := q.Select(matchers...) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(want, have) { t.Errorf("expected series set %+v, got %+v", want, have) } } @@ -154,8 +157,8 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet { - return mockSeriesSet{} +func (mockQuerier) Select(...*labels.Matcher) (storage.SeriesSet, error) { + return mockSeriesSet{}, nil } func TestPreferLocalStorageFilter(t *testing.T) { @@ -310,7 +313,11 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - if want, have := test.seriesSet, q.Select(test.matchers...); want != have { + have, err := q.Select(test.matchers...) + if err != nil { + t.Error(err) + } + if want := test.seriesSet; want != have { t.Errorf("%d. expected series set %+v, got %+v", i, want, have) } if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index f07776544..070488fbf 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -188,14 +188,17 @@ type querier struct { q tsdb.Querier } -func (q querier) Select(oms ...*labels.Matcher) storage.SeriesSet { +func (q querier) Select(oms ...*labels.Matcher) (storage.SeriesSet, error) { ms := make([]tsdbLabels.Matcher, 0, len(oms)) for _, om := range oms { ms = append(ms, convertMatcher(om)) } - - return seriesSet{set: q.q.Select(ms...)} + set, err := q.q.Select(ms...) + if err != nil { + return nil, err + } + return seriesSet{set: set}, nil } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 825e8db72..238ba8ff6 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -380,7 +380,11 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var set storage.SeriesSet for _, mset := range matcherSets { - set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) + s, err := q.Select(mset...) + if err != nil { + return nil, &apiError{errorExec, err} + } + set = storage.DeduplicateSeriesSet(set, s) } metrics := []labels.Labels{} @@ -517,7 +521,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } - resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...)) + set, err := querier.Select(filteredMatchers...) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp.Results[i], err = remote.ToQueryResult(set) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/web/federate.go b/web/federate.go index 7597f6ddb..43028a3ac 100644 --- a/web/federate.go +++ b/web/federate.go @@ -75,7 +75,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { var set storage.SeriesSet for _, mset := range matcherSets { - set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) + s, err := q.Select(mset...) + if err != nil { + federationErrors.Inc() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + set = storage.DeduplicateSeriesSet(set, s) } if set == nil { return