mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 13:14:05 -08:00
Limit concurrent remote reads. (#4656)
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
d3a1ff1abf
commit
4c52400708
|
@ -169,7 +169,10 @@ func main() {
|
|||
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
|
||||
|
||||
a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit.").
|
||||
Default("5e7").IntVar(&cfg.web.RemoteReadLimit)
|
||||
Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit)
|
||||
|
||||
a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit.").
|
||||
Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit)
|
||||
|
||||
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert.").
|
||||
Default("1h").SetValue(&cfg.outageTolerance)
|
||||
|
|
48
pkg/gate/gate.go
Normal file
48
pkg/gate/gate.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gate
|
||||
|
||||
import "context"
|
||||
|
||||
// A Gate controls the maximum number of concurrently running and waiting queries.
|
||||
type Gate struct {
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
// NewGate returns a query gate that limits the number of queries
|
||||
// being concurrently executed.
|
||||
func New(length int) *Gate {
|
||||
return &Gate{
|
||||
ch: make(chan struct{}, length),
|
||||
}
|
||||
}
|
||||
|
||||
// Start blocks until the gate has a free spot or the context is done.
|
||||
func (g *Gate) Start(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case g.ch <- struct{}{}:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Done releases a single spot in the gate.
|
||||
func (g *Gate) Done() {
|
||||
select {
|
||||
case <-g.ch:
|
||||
default:
|
||||
panic("gate.Done: more operations done than started")
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import (
|
|||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/pkg/gate"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
|
@ -148,27 +149,30 @@ func (q *query) Exec(ctx context.Context) *Result {
|
|||
func contextDone(ctx context.Context, env string) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
switch err {
|
||||
case context.Canceled:
|
||||
return ErrQueryCanceled(env)
|
||||
case context.DeadlineExceeded:
|
||||
return ErrQueryTimeout(env)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
return contextErr(ctx.Err(), env)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func contextErr(err error, env string) error {
|
||||
switch err {
|
||||
case context.Canceled:
|
||||
return ErrQueryCanceled(env)
|
||||
case context.DeadlineExceeded:
|
||||
return ErrQueryTimeout(env)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Engine handles the lifetime of queries from beginning to end.
|
||||
// It is connected to a querier.
|
||||
type Engine struct {
|
||||
logger log.Logger
|
||||
metrics *engineMetrics
|
||||
timeout time.Duration
|
||||
gate *queryGate
|
||||
gate *gate.Gate
|
||||
}
|
||||
|
||||
// NewEngine returns a new engine.
|
||||
|
@ -232,7 +236,7 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int,
|
|||
)
|
||||
}
|
||||
return &Engine{
|
||||
gate: newQueryGate(maxConcurrent),
|
||||
gate: gate.New(maxConcurrent),
|
||||
timeout: timeout,
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
|
@ -317,7 +321,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
|
|||
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
|
||||
|
||||
if err := ng.gate.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
return nil, contextErr(err, "query queue")
|
||||
}
|
||||
defer ng.gate.Done()
|
||||
|
||||
|
@ -1714,38 +1718,6 @@ func shouldDropMetricName(op ItemType) bool {
|
|||
// series is considered stale.
|
||||
var LookbackDelta = 5 * time.Minute
|
||||
|
||||
// A queryGate controls the maximum number of concurrently running and waiting queries.
|
||||
type queryGate struct {
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
// newQueryGate returns a query gate that limits the number of queries
|
||||
// being concurrently executed.
|
||||
func newQueryGate(length int) *queryGate {
|
||||
return &queryGate{
|
||||
ch: make(chan struct{}, length),
|
||||
}
|
||||
}
|
||||
|
||||
// Start blocks until the gate has a free spot or the context is done.
|
||||
func (g *queryGate) Start(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return contextDone(ctx, "query queue")
|
||||
case g.ch <- struct{}{}:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Done releases a single spot in the gate.
|
||||
func (g *queryGate) Done() {
|
||||
select {
|
||||
case <-g.ch:
|
||||
default:
|
||||
panic("engine.queryGate.Done: more operations done than started")
|
||||
}
|
||||
}
|
||||
|
||||
// documentedType returns the internal type to the equivalent
|
||||
// user facing terminology as defined in the documentation.
|
||||
func documentedType(t ValueType) string {
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/prometheus/tsdb"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/gate"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/textparse"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
|
@ -131,10 +132,11 @@ type API struct {
|
|||
flagsMap map[string]string
|
||||
ready func(http.HandlerFunc) http.HandlerFunc
|
||||
|
||||
db func() *tsdb.DB
|
||||
enableAdmin bool
|
||||
logger log.Logger
|
||||
remoteReadLimit int
|
||||
db func() *tsdb.DB
|
||||
enableAdmin bool
|
||||
logger log.Logger
|
||||
remoteReadSampleLimit int
|
||||
remoteReadGate *gate.Gate
|
||||
}
|
||||
|
||||
// NewAPI returns an initialized API type.
|
||||
|
@ -150,7 +152,8 @@ func NewAPI(
|
|||
enableAdmin bool,
|
||||
logger log.Logger,
|
||||
rr rulesRetriever,
|
||||
remoteReadLimit int,
|
||||
remoteReadSampleLimit int,
|
||||
remoteReadConcurrencyLimit int,
|
||||
) *API {
|
||||
return &API{
|
||||
QueryEngine: qe,
|
||||
|
@ -158,15 +161,16 @@ func NewAPI(
|
|||
targetRetriever: tr,
|
||||
alertmanagerRetriever: ar,
|
||||
|
||||
now: time.Now,
|
||||
config: configFunc,
|
||||
flagsMap: flagsMap,
|
||||
ready: readyFunc,
|
||||
db: db,
|
||||
enableAdmin: enableAdmin,
|
||||
rulesRetriever: rr,
|
||||
remoteReadLimit: remoteReadLimit,
|
||||
logger: logger,
|
||||
now: time.Now,
|
||||
config: configFunc,
|
||||
flagsMap: flagsMap,
|
||||
ready: readyFunc,
|
||||
db: db,
|
||||
enableAdmin: enableAdmin,
|
||||
rulesRetriever: rr,
|
||||
remoteReadSampleLimit: remoteReadSampleLimit,
|
||||
remoteReadGate: gate.New(remoteReadConcurrencyLimit),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -751,6 +755,9 @@ func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) {
|
|||
}
|
||||
|
||||
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||
api.remoteReadGate.Start(r.Context())
|
||||
defer api.remoteReadGate.Done()
|
||||
|
||||
req, err := remote.DecodeReadRequest(r)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
|
@ -798,7 +805,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadLimit)
|
||||
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
|
||||
if err != nil {
|
||||
if httpErr, ok := err.(remote.HTTPError); ok {
|
||||
http.Error(w, httpErr.Error(), httpErr.Status())
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/prometheus/common/route"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/gate"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
|
@ -834,7 +835,8 @@ func TestReadEndpoint(t *testing.T) {
|
|||
},
|
||||
}
|
||||
},
|
||||
remoteReadLimit: 1e6,
|
||||
remoteReadSampleLimit: 1e6,
|
||||
remoteReadGate: gate.New(1),
|
||||
}
|
||||
|
||||
// Encode the request.
|
||||
|
|
28
web/web.go
28
web/web.go
|
@ -156,18 +156,19 @@ type Options struct {
|
|||
Version *PrometheusVersion
|
||||
Flags map[string]string
|
||||
|
||||
ListenAddress string
|
||||
ReadTimeout time.Duration
|
||||
MaxConnections int
|
||||
ExternalURL *url.URL
|
||||
RoutePrefix string
|
||||
UseLocalAssets bool
|
||||
UserAssetsPath string
|
||||
ConsoleTemplatesPath string
|
||||
ConsoleLibrariesPath string
|
||||
EnableLifecycle bool
|
||||
EnableAdminAPI bool
|
||||
RemoteReadLimit int
|
||||
ListenAddress string
|
||||
ReadTimeout time.Duration
|
||||
MaxConnections int
|
||||
ExternalURL *url.URL
|
||||
RoutePrefix string
|
||||
UseLocalAssets bool
|
||||
UserAssetsPath string
|
||||
ConsoleTemplatesPath string
|
||||
ConsoleLibrariesPath string
|
||||
EnableLifecycle bool
|
||||
EnableAdminAPI bool
|
||||
RemoteReadSampleLimit int
|
||||
RemoteReadConcurrencyLimit int
|
||||
}
|
||||
|
||||
func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
|
||||
|
@ -228,7 +229,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
h.options.EnableAdminAPI,
|
||||
logger,
|
||||
h.ruleManager,
|
||||
h.options.RemoteReadLimit,
|
||||
h.options.RemoteReadSampleLimit,
|
||||
h.options.RemoteReadConcurrencyLimit,
|
||||
)
|
||||
|
||||
if o.RoutePrefix != "/" {
|
||||
|
|
Loading…
Reference in a new issue