From 8fa18d564a85bc6e7c6c977e02f29a1cccd8f3c7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 12 Oct 2016 19:34:22 +0200 Subject: [PATCH] storage: enhance Querier interface usage This extracts Querier as an instantiateable and closeable object rather than just defining extending methods of the storage interface. This improves composability and allows abstracting query transactions, which can be useful for transaction-level caches, consistent data views, and encapsulating teardown. --- promql/engine.go | 34 ++++++++++++++++++++++------------ storage/local/interface.go | 6 +++++- storage/local/noop_storage.go | 32 ++++++++++++++++++++++---------- storage/local/storage.go | 13 +++++++++++++ web/api/v1/api.go | 16 ++++++++++++++-- web/federate.go | 9 ++++++++- 6 files changed, 84 insertions(+), 26 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 64265060a..f1d45f940 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -218,22 +218,27 @@ func contextDone(ctx context.Context, env string) error { // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { - // The querier on which the engine operates. - querier local.Querier + // A Querier constructor against an underlying storage. + queryable Queryable // The gate limiting the maximum number of concurrent and waiting queries. gate *queryGate options *EngineOptions } +// Queryable allows opening a storage querier. +type Queryable interface { + Querier() (local.Querier, error) +} + // NewEngine returns a new engine. -func NewEngine(querier local.Querier, o *EngineOptions) *Engine { +func NewEngine(queryable Queryable, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } return &Engine{ - querier: querier, - gate: newQueryGate(o.MaxConcurrentQueries), - options: o, + queryable: queryable, + gate: newQueryGate(o.MaxConcurrentQueries), + options: o, } } @@ -351,13 +356,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { + querier, err := ng.queryable.Querier() + if err != nil { + return nil, err + } + defer querier.Close() + prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - err := ng.populateIterators(ctx, s) + err = ng.populateIterators(ctx, querier, s) prepareTimer.Stop() if err != nil { return nil, err } - defer ng.closeIterators(s) evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() @@ -463,20 +473,20 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return resMatrix, nil } -func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { +func (ng *Engine) populateIterators(ctx context.Context, querier local.Querier, s *EvalStmt) error { var queryErr error Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: if s.Start.Equal(s.End) { - n.iterators, queryErr = ng.querier.QueryInstant( + n.iterators, queryErr = querier.QueryInstant( ctx, s.Start.Add(-n.Offset), StalenessDelta, n.LabelMatchers..., ) } else { - n.iterators, queryErr = ng.querier.QueryRange( + n.iterators, queryErr = querier.QueryRange( ctx, s.Start.Add(-n.Offset-StalenessDelta), s.End.Add(-n.Offset), @@ -487,7 +497,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { return false } case *MatrixSelector: - n.iterators, queryErr = ng.querier.QueryRange( + n.iterators, queryErr = querier.QueryRange( ctx, s.Start.Add(-n.Offset-n.Range), s.End.Add(-n.Offset), diff --git a/storage/local/interface.go b/storage/local/interface.go index 7e95a158b..3f1fda713 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -26,7 +26,8 @@ import ( // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { - Querier + // Querier returns a new Querier on the storage. + Querier() (Querier, error) // This SampleAppender needs multiple samples for the same fingerprint to be // submitted in chronological order, from oldest to newest. When Append has @@ -57,6 +58,9 @@ type Storage interface { // Querier allows querying a time series storage. type Querier interface { + // Close closes the querier. Behavior for subsequent calls to Querier methods + // is undefined. + Close() error // QueryRange returns a list of series iterators for the selected // time range and label matchers. The iterators need to be closed // after usage. diff --git a/storage/local/noop_storage.go b/storage/local/noop_storage.go index 77a48bbb8..360849da1 100644 --- a/storage/local/noop_storage.go +++ b/storage/local/noop_storage.go @@ -40,23 +40,35 @@ func (s *NoopStorage) Stop() error { func (s *NoopStorage) WaitForIndexing() { } -// LastSampleForLabelMatchers implements Storage. -func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { +// Querier implements Storage. +func (s *NoopStorage) Querier() (Querier, error) { + return &NoopQuerier{}, nil +} + +type NoopQuerier struct{} + +// Close implements Querier. +func (s *NoopQuerier) Close() error { + return nil +} + +// LastSampleForLabelMatchers implements Querier. +func (s *NoopQuerier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { return nil, nil } -// QueryRange implements Storage. -func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +// QueryRange implements Querier +func (s *NoopQuerier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } -// QueryInstant implements Storage. -func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +// QueryInstant implements Querier. +func (s *NoopQuerier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } -// MetricsForLabelMatchers implements Storage. -func (s *NoopStorage) MetricsForLabelMatchers( +// MetricsForLabelMatchers implements Querier. +func (s *NoopQuerier) MetricsForLabelMatchers( ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers, @@ -64,8 +76,8 @@ func (s *NoopStorage) MetricsForLabelMatchers( return nil, nil } -// LabelValuesForLabelName implements Storage. -func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { +// LabelValuesForLabelName implements Querier. +func (s *NoopQuerier) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { return nil, nil } diff --git a/storage/local/storage.go b/storage/local/storage.go index 1288389e0..16ecf0741 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -403,6 +403,19 @@ func (s *MemorySeriesStorage) Stop() error { return nil } +type memorySeriesStorageQuerier struct { + *MemorySeriesStorage +} + +func (memorySeriesStorageQuerier) Close() error { + return nil +} + +// Querier implements the storage interface. +func (s *MemorySeriesStorage) Querier() (Querier, error) { + return memorySeriesStorageQuerier{s}, nil +} + // WaitForIndexing implements Storage. func (s *MemorySeriesStorage) WaitForIndexing() { s.persistence.waitForIndexing() diff --git a/web/api/v1/api.go b/web/api/v1/api.go index fc05e160a..733d0ac14 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -226,7 +226,13 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { if !model.LabelNameRE.MatchString(name) { return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } - vals, err := api.Storage.LabelValuesForLabelName(api.context(r), model.LabelName(name)) + q, err := api.Storage.Querier() + if err != nil { + return nil, &apiError{errorExec, err} + } + defer q.Close() + + vals, err := q.LabelValuesForLabelName(api.context(r), model.LabelName(name)) if err != nil { return nil, &apiError{errorExec, err} } @@ -272,7 +278,13 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { matcherSets = append(matcherSets, matchers) } - res, err := api.Storage.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) + q, err := api.Storage.Querier() + if err != nil { + return nil, &apiError{errorExec, err} + } + defer q.Close() + + res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) if err != nil { return nil, &apiError{errorExec, err} } diff --git a/web/federate.go b/web/federate.go index 9a7d15193..960b55470 100644 --- a/web/federate.go +++ b/web/federate.go @@ -50,7 +50,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { ) w.Header().Set("Content-Type", string(format)) - vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) + q, err := h.storage.Querier() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer q.Close() + + vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return