diff --git a/promql/engine.go b/promql/engine.go index 64265060a1..f1d45f9401 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -218,22 +218,27 @@ func contextDone(ctx context.Context, env string) error { // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { - // The querier on which the engine operates. - querier local.Querier + // A Querier constructor against an underlying storage. + queryable Queryable // The gate limiting the maximum number of concurrent and waiting queries. gate *queryGate options *EngineOptions } +// Queryable allows opening a storage querier. +type Queryable interface { + Querier() (local.Querier, error) +} + // NewEngine returns a new engine. -func NewEngine(querier local.Querier, o *EngineOptions) *Engine { +func NewEngine(queryable Queryable, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } return &Engine{ - querier: querier, - gate: newQueryGate(o.MaxConcurrentQueries), - options: o, + queryable: queryable, + gate: newQueryGate(o.MaxConcurrentQueries), + options: o, } } @@ -351,13 +356,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { + querier, err := ng.queryable.Querier() + if err != nil { + return nil, err + } + defer querier.Close() + prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - err := ng.populateIterators(ctx, s) + err = ng.populateIterators(ctx, querier, s) prepareTimer.Stop() if err != nil { return nil, err } - defer ng.closeIterators(s) evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() @@ -463,20 +473,20 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return resMatrix, nil } -func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { +func (ng *Engine) populateIterators(ctx context.Context, querier local.Querier, s *EvalStmt) error { var queryErr error Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: if s.Start.Equal(s.End) { - n.iterators, queryErr = ng.querier.QueryInstant( + n.iterators, queryErr = querier.QueryInstant( ctx, s.Start.Add(-n.Offset), StalenessDelta, n.LabelMatchers..., ) } else { - n.iterators, queryErr = ng.querier.QueryRange( + n.iterators, queryErr = querier.QueryRange( ctx, s.Start.Add(-n.Offset-StalenessDelta), s.End.Add(-n.Offset), @@ -487,7 +497,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { return false } case *MatrixSelector: - n.iterators, queryErr = ng.querier.QueryRange( + n.iterators, queryErr = querier.QueryRange( ctx, s.Start.Add(-n.Offset-n.Range), s.End.Add(-n.Offset), diff --git a/storage/local/interface.go b/storage/local/interface.go index 7e95a158b4..3f1fda7130 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -26,7 +26,8 @@ import ( // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { - Querier + // Querier returns a new Querier on the storage. + Querier() (Querier, error) // This SampleAppender needs multiple samples for the same fingerprint to be // submitted in chronological order, from oldest to newest. When Append has @@ -57,6 +58,9 @@ type Storage interface { // Querier allows querying a time series storage. type Querier interface { + // Close closes the querier. Behavior for subsequent calls to Querier methods + // is undefined. + Close() error // QueryRange returns a list of series iterators for the selected // time range and label matchers. The iterators need to be closed // after usage. diff --git a/storage/local/noop_storage.go b/storage/local/noop_storage.go index 77a48bbb8c..360849da1b 100644 --- a/storage/local/noop_storage.go +++ b/storage/local/noop_storage.go @@ -40,23 +40,35 @@ func (s *NoopStorage) Stop() error { func (s *NoopStorage) WaitForIndexing() { } -// LastSampleForLabelMatchers implements Storage. -func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { +// Querier implements Storage. +func (s *NoopStorage) Querier() (Querier, error) { + return &NoopQuerier{}, nil +} + +type NoopQuerier struct{} + +// Close implements Querier. +func (s *NoopQuerier) Close() error { + return nil +} + +// LastSampleForLabelMatchers implements Querier. +func (s *NoopQuerier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { return nil, nil } -// QueryRange implements Storage. -func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +// QueryRange implements Querier +func (s *NoopQuerier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } -// QueryInstant implements Storage. -func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +// QueryInstant implements Querier. +func (s *NoopQuerier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } -// MetricsForLabelMatchers implements Storage. -func (s *NoopStorage) MetricsForLabelMatchers( +// MetricsForLabelMatchers implements Querier. +func (s *NoopQuerier) MetricsForLabelMatchers( ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers, @@ -64,8 +76,8 @@ func (s *NoopStorage) MetricsForLabelMatchers( return nil, nil } -// LabelValuesForLabelName implements Storage. -func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { +// LabelValuesForLabelName implements Querier. +func (s *NoopQuerier) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { return nil, nil } diff --git a/storage/local/storage.go b/storage/local/storage.go index 1288389e03..16ecf07411 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -403,6 +403,19 @@ func (s *MemorySeriesStorage) Stop() error { return nil } +type memorySeriesStorageQuerier struct { + *MemorySeriesStorage +} + +func (memorySeriesStorageQuerier) Close() error { + return nil +} + +// Querier implements the storage interface. +func (s *MemorySeriesStorage) Querier() (Querier, error) { + return memorySeriesStorageQuerier{s}, nil +} + // WaitForIndexing implements Storage. func (s *MemorySeriesStorage) WaitForIndexing() { s.persistence.waitForIndexing() diff --git a/web/api/v1/api.go b/web/api/v1/api.go index fc05e160a3..733d0ac147 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -226,7 +226,13 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { if !model.LabelNameRE.MatchString(name) { return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } - vals, err := api.Storage.LabelValuesForLabelName(api.context(r), model.LabelName(name)) + q, err := api.Storage.Querier() + if err != nil { + return nil, &apiError{errorExec, err} + } + defer q.Close() + + vals, err := q.LabelValuesForLabelName(api.context(r), model.LabelName(name)) if err != nil { return nil, &apiError{errorExec, err} } @@ -272,7 +278,13 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { matcherSets = append(matcherSets, matchers) } - res, err := api.Storage.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) + q, err := api.Storage.Querier() + if err != nil { + return nil, &apiError{errorExec, err} + } + defer q.Close() + + res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) if err != nil { return nil, &apiError{errorExec, err} } diff --git a/web/federate.go b/web/federate.go index 9a7d151933..960b554702 100644 --- a/web/federate.go +++ b/web/federate.go @@ -50,7 +50,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { ) w.Header().Set("Content-Type", string(format)) - vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) + q, err := h.storage.Querier() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer q.Close() + + vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return