mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Limit the number of samples remote read can return. (#4532)
* Limit the number of samples remote read can return. - Return 413 entity too large. - Limit can be set be a flag. Allow 0 to mean no limit. - Include limit in error message. - Set default limit to 50M (* 16 bytes = 800MB). Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
ba7eb733e8
commit
457e4bb58e
|
@ -168,6 +168,9 @@ func main() {
|
||||||
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
|
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
|
||||||
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
|
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
|
||||||
|
|
||||||
|
a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit.").
|
||||||
|
Default("5e7").IntVar(&cfg.web.RemoteReadLimit)
|
||||||
|
|
||||||
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert.").
|
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert.").
|
||||||
Default("1h").SetValue(&cfg.outageTolerance)
|
Default("1h").SetValue(&cfg.outageTolerance)
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,19 @@ import (
|
||||||
// decodeReadLimit is the maximum size of a read request body in bytes.
|
// decodeReadLimit is the maximum size of a read request body in bytes.
|
||||||
const decodeReadLimit = 32 * 1024 * 1024
|
const decodeReadLimit = 32 * 1024 * 1024
|
||||||
|
|
||||||
|
type HTTPError struct {
|
||||||
|
msg string
|
||||||
|
status int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e HTTPError) Error() string {
|
||||||
|
return e.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e HTTPError) Status() int {
|
||||||
|
return e.status
|
||||||
|
}
|
||||||
|
|
||||||
// DecodeReadRequest reads a remote.Request from a http.Request.
|
// DecodeReadRequest reads a remote.Request from a http.Request.
|
||||||
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) {
|
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) {
|
||||||
compressed, err := ioutil.ReadAll(io.LimitReader(r.Body, decodeReadLimit))
|
compressed, err := ioutil.ReadAll(io.LimitReader(r.Body, decodeReadLimit))
|
||||||
|
@ -134,7 +147,8 @@ func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, *storage.Sel
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToQueryResult builds a QueryResult proto.
|
// ToQueryResult builds a QueryResult proto.
|
||||||
func ToQueryResult(ss storage.SeriesSet) (*prompb.QueryResult, error) {
|
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) {
|
||||||
|
numSamples := 0
|
||||||
resp := &prompb.QueryResult{}
|
resp := &prompb.QueryResult{}
|
||||||
for ss.Next() {
|
for ss.Next() {
|
||||||
series := ss.At()
|
series := ss.At()
|
||||||
|
@ -142,6 +156,13 @@ func ToQueryResult(ss storage.SeriesSet) (*prompb.QueryResult, error) {
|
||||||
samples := []*prompb.Sample{}
|
samples := []*prompb.Sample{}
|
||||||
|
|
||||||
for iter.Next() {
|
for iter.Next() {
|
||||||
|
numSamples++
|
||||||
|
if sampleLimit > 0 && numSamples > sampleLimit {
|
||||||
|
return nil, HTTPError{
|
||||||
|
msg: fmt.Sprintf("exceeded sample limit (%d)", sampleLimit),
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
}
|
||||||
|
}
|
||||||
ts, val := iter.At()
|
ts, val := iter.At()
|
||||||
samples = append(samples, &prompb.Sample{
|
samples = append(samples, &prompb.Sample{
|
||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
|
|
|
@ -135,7 +135,7 @@ func TestSeriesSetFilter(t *testing.T) {
|
||||||
|
|
||||||
for i, tc := range tests {
|
for i, tc := range tests {
|
||||||
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove)
|
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove)
|
||||||
have, err := ToQueryResult(filtered)
|
have, err := ToQueryResult(filtered, 1e6)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,6 +134,7 @@ type API struct {
|
||||||
db func() *tsdb.DB
|
db func() *tsdb.DB
|
||||||
enableAdmin bool
|
enableAdmin bool
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
remoteReadLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPI returns an initialized API type.
|
// NewAPI returns an initialized API type.
|
||||||
|
@ -149,12 +150,14 @@ func NewAPI(
|
||||||
enableAdmin bool,
|
enableAdmin bool,
|
||||||
logger log.Logger,
|
logger log.Logger,
|
||||||
rr rulesRetriever,
|
rr rulesRetriever,
|
||||||
|
remoteReadLimit int,
|
||||||
) *API {
|
) *API {
|
||||||
return &API{
|
return &API{
|
||||||
QueryEngine: qe,
|
QueryEngine: qe,
|
||||||
Queryable: q,
|
Queryable: q,
|
||||||
targetRetriever: tr,
|
targetRetriever: tr,
|
||||||
alertmanagerRetriever: ar,
|
alertmanagerRetriever: ar,
|
||||||
|
|
||||||
now: time.Now,
|
now: time.Now,
|
||||||
config: configFunc,
|
config: configFunc,
|
||||||
flagsMap: flagsMap,
|
flagsMap: flagsMap,
|
||||||
|
@ -162,6 +165,7 @@ func NewAPI(
|
||||||
db: db,
|
db: db,
|
||||||
enableAdmin: enableAdmin,
|
enableAdmin: enableAdmin,
|
||||||
rulesRetriever: rr,
|
rulesRetriever: rr,
|
||||||
|
remoteReadLimit: remoteReadLimit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -793,8 +797,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp.Results[i], err = remote.ToQueryResult(set)
|
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if httpErr, ok := err.(remote.HTTPError); ok {
|
||||||
|
http.Error(w, httpErr.Error(), httpErr.Status())
|
||||||
|
return
|
||||||
|
}
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,7 +313,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp.Results[i], err = remote.ToQueryResult(set)
|
resp.Results[i], err = remote.ToQueryResult(set, 1e6)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
@ -833,6 +833,7 @@ func TestReadEndpoint(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
remoteReadLimit: 1e6,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode the request.
|
// Encode the request.
|
||||||
|
@ -861,6 +862,10 @@ func TestReadEndpoint(t *testing.T) {
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
api.remoteRead(recorder, request)
|
api.remoteRead(recorder, request)
|
||||||
|
|
||||||
|
if recorder.Code/100 != 2 {
|
||||||
|
t.Fatal(recorder.Code)
|
||||||
|
}
|
||||||
|
|
||||||
// Decode the response.
|
// Decode the response.
|
||||||
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -167,6 +167,7 @@ type Options struct {
|
||||||
ConsoleLibrariesPath string
|
ConsoleLibrariesPath string
|
||||||
EnableLifecycle bool
|
EnableLifecycle bool
|
||||||
EnableAdminAPI bool
|
EnableAdminAPI bool
|
||||||
|
RemoteReadLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
|
func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
|
||||||
|
@ -227,6 +228,7 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
h.options.EnableAdminAPI,
|
h.options.EnableAdminAPI,
|
||||||
logger,
|
logger,
|
||||||
h.ruleManager,
|
h.ruleManager,
|
||||||
|
h.options.RemoteReadLimit,
|
||||||
)
|
)
|
||||||
|
|
||||||
if o.RoutePrefix != "/" {
|
if o.RoutePrefix != "/" {
|
||||||
|
|
Loading…
Reference in a new issue