From 4c524007087ef50f02a396e1f4529340ee81783f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 25 Sep 2018 20:07:34 +0100 Subject: [PATCH] Limit concurrent remote reads. (#4656) Signed-off-by: Tom Wilkie --- cmd/prometheus/main.go | 5 +++- pkg/gate/gate.go | 48 +++++++++++++++++++++++++++++++++ promql/engine.go | 60 +++++++++++------------------------------- web/api/v1/api.go | 37 +++++++++++++++----------- web/api/v1/api_test.go | 4 ++- web/web.go | 28 +++++++++++--------- 6 files changed, 108 insertions(+), 74 deletions(-) create mode 100644 pkg/gate/gate.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index bc50e678d..9a940ab5d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -169,7 +169,10 @@ func main() { Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit."). - Default("5e7").IntVar(&cfg.web.RemoteReadLimit) + Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit) + + a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). + Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert."). Default("1h").SetValue(&cfg.outageTolerance) diff --git a/pkg/gate/gate.go b/pkg/gate/gate.go new file mode 100644 index 000000000..3d4fddb75 --- /dev/null +++ b/pkg/gate/gate.go @@ -0,0 +1,48 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gate + +import "context" + +// A Gate controls the maximum number of concurrently running and waiting queries. +type Gate struct { + ch chan struct{} +} + +// NewGate returns a query gate that limits the number of queries +// being concurrently executed. +func New(length int) *Gate { + return &Gate{ + ch: make(chan struct{}, length), + } +} + +// Start blocks until the gate has a free spot or the context is done. +func (g *Gate) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case g.ch <- struct{}{}: + return nil + } +} + +// Done releases a single spot in the gate. +func (g *Gate) Done() { + select { + case <-g.ch: + default: + panic("gate.Done: more operations done than started") + } +} diff --git a/promql/engine.go b/promql/engine.go index 90fc78e8a..e8ca8b7eb 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -30,6 +30,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -148,27 +149,30 @@ func (q *query) Exec(ctx context.Context) *Result { func contextDone(ctx context.Context, env string) error { select { case <-ctx.Done(): - err := ctx.Err() - switch err { - case context.Canceled: - return ErrQueryCanceled(env) - case context.DeadlineExceeded: - return ErrQueryTimeout(env) - default: - return err - } + return contextErr(ctx.Err(), env) default: return nil } } +func contextErr(err error, env string) error { + switch err { + case context.Canceled: + return ErrQueryCanceled(env) + case context.DeadlineExceeded: + return ErrQueryTimeout(env) + default: + return err + } +} + // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { logger log.Logger metrics *engineMetrics timeout time.Duration - gate *queryGate + gate *gate.Gate } // NewEngine returns a new engine. @@ -232,7 +236,7 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, ) } return &Engine{ - gate: newQueryGate(maxConcurrent), + gate: gate.New(maxConcurrent), timeout: timeout, logger: logger, metrics: metrics, @@ -317,7 +321,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) if err := ng.gate.Start(ctx); err != nil { - return nil, err + return nil, contextErr(err, "query queue") } defer ng.gate.Done() @@ -1714,38 +1718,6 @@ func shouldDropMetricName(op ItemType) bool { // series is considered stale. var LookbackDelta = 5 * time.Minute -// A queryGate controls the maximum number of concurrently running and waiting queries. -type queryGate struct { - ch chan struct{} -} - -// newQueryGate returns a query gate that limits the number of queries -// being concurrently executed. -func newQueryGate(length int) *queryGate { - return &queryGate{ - ch: make(chan struct{}, length), - } -} - -// Start blocks until the gate has a free spot or the context is done. -func (g *queryGate) Start(ctx context.Context) error { - select { - case <-ctx.Done(): - return contextDone(ctx, "query queue") - case g.ch <- struct{}{}: - return nil - } -} - -// Done releases a single spot in the gate. -func (g *queryGate) Done() { - select { - case <-g.ch: - default: - panic("engine.queryGate.Done: more operations done than started") - } -} - // documentedType returns the internal type to the equivalent // user facing terminology as defined in the documentation. func documentedType(t ValueType) string { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 71fd4ac54..db4aa407e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/tsdb" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" @@ -131,10 +132,11 @@ type API struct { flagsMap map[string]string ready func(http.HandlerFunc) http.HandlerFunc - db func() *tsdb.DB - enableAdmin bool - logger log.Logger - remoteReadLimit int + db func() *tsdb.DB + enableAdmin bool + logger log.Logger + remoteReadSampleLimit int + remoteReadGate *gate.Gate } // NewAPI returns an initialized API type. @@ -150,7 +152,8 @@ func NewAPI( enableAdmin bool, logger log.Logger, rr rulesRetriever, - remoteReadLimit int, + remoteReadSampleLimit int, + remoteReadConcurrencyLimit int, ) *API { return &API{ QueryEngine: qe, @@ -158,15 +161,16 @@ func NewAPI( targetRetriever: tr, alertmanagerRetriever: ar, - now: time.Now, - config: configFunc, - flagsMap: flagsMap, - ready: readyFunc, - db: db, - enableAdmin: enableAdmin, - rulesRetriever: rr, - remoteReadLimit: remoteReadLimit, - logger: logger, + now: time.Now, + config: configFunc, + flagsMap: flagsMap, + ready: readyFunc, + db: db, + enableAdmin: enableAdmin, + rulesRetriever: rr, + remoteReadSampleLimit: remoteReadSampleLimit, + remoteReadGate: gate.New(remoteReadConcurrencyLimit), + logger: logger, } } @@ -751,6 +755,9 @@ func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) { } func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { + api.remoteReadGate.Start(r.Context()) + defer api.remoteReadGate.Done() + req, err := remote.DecodeReadRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -798,7 +805,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadLimit) + resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit) if err != nil { if httpErr, ok := err.(remote.HTTPError); ok { http.Error(w, httpErr.Error(), httpErr.Status()) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index f2f847997..32940ff4c 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" @@ -834,7 +835,8 @@ func TestReadEndpoint(t *testing.T) { }, } }, - remoteReadLimit: 1e6, + remoteReadSampleLimit: 1e6, + remoteReadGate: gate.New(1), } // Encode the request. diff --git a/web/web.go b/web/web.go index f7bf4f7b5..4df32d1b6 100644 --- a/web/web.go +++ b/web/web.go @@ -156,18 +156,19 @@ type Options struct { Version *PrometheusVersion Flags map[string]string - ListenAddress string - ReadTimeout time.Duration - MaxConnections int - ExternalURL *url.URL - RoutePrefix string - UseLocalAssets bool - UserAssetsPath string - ConsoleTemplatesPath string - ConsoleLibrariesPath string - EnableLifecycle bool - EnableAdminAPI bool - RemoteReadLimit int + ListenAddress string + ReadTimeout time.Duration + MaxConnections int + ExternalURL *url.URL + RoutePrefix string + UseLocalAssets bool + UserAssetsPath string + ConsoleTemplatesPath string + ConsoleLibrariesPath string + EnableLifecycle bool + EnableAdminAPI bool + RemoteReadSampleLimit int + RemoteReadConcurrencyLimit int } func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { @@ -228,7 +229,8 @@ func New(logger log.Logger, o *Options) *Handler { h.options.EnableAdminAPI, logger, h.ruleManager, - h.options.RemoteReadLimit, + h.options.RemoteReadSampleLimit, + h.options.RemoteReadConcurrencyLimit, ) if o.RoutePrefix != "/" {