Merge pull request #8833 from hanjm/feature/add-scape-read-body-limit

Add body_size_limit to prevent bad targets response large body cause Prometheus server OOM (#8827)
This commit is contained in:
Julien Pivotto 2021-06-02 09:24:59 +02:00 committed by GitHub
commit 20c6739adc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 121 additions and 14 deletions

View file

@ -23,6 +23,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/alecthomas/units"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -382,6 +383,9 @@ type ScrapeConfig struct {
MetricsPath string `yaml:"metrics_path,omitempty"` MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets. // The URL scheme with which to fetch metrics from targets.
Scheme string `yaml:"scheme,omitempty"` Scheme string `yaml:"scheme,omitempty"`
// An uncompressed response body larger than this many bytes will cause the
// scrape to fail. 0 means no limit.
BodySizeLimit units.Base2Bytes `yaml:"body_size_limit,omitempty"`
// More than this many samples post metric-relabeling will cause the scrape to // More than this many samples post metric-relabeling will cause the scrape to
// fail. // fail.
SampleLimit uint `yaml:"sample_limit,omitempty"` SampleLimit uint `yaml:"sample_limit,omitempty"`

View file

@ -23,6 +23,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/alecthomas/units"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/common/config" "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -224,6 +225,7 @@ var expectedConf = &Config{
HonorTimestamps: true, HonorTimestamps: true,
ScrapeInterval: model.Duration(50 * time.Second), ScrapeInterval: model.Duration(50 * time.Second),
ScrapeTimeout: model.Duration(5 * time.Second), ScrapeTimeout: model.Duration(5 * time.Second),
BodySizeLimit: 10 * units.MiB,
SampleLimit: 1000, SampleLimit: 1000,
HTTPClientConfig: config.HTTPClientConfig{ HTTPClientConfig: config.HTTPClientConfig{
@ -1227,6 +1229,10 @@ var expectedErrors = []struct {
filename: "scaleway_two_secrets.bad.yml", filename: "scaleway_two_secrets.bad.yml",
errMsg: "at most one of secret_key & secret_key_file must be configured", errMsg: "at most one of secret_key & secret_key_file must be configured",
}, },
{
filename: "scrape_body_size_limit.bad.yml",
errMsg: "units: unknown unit in 100",
},
} }
func TestBadConfigs(t *testing.T) { func TestBadConfigs(t *testing.T) {

View file

@ -97,6 +97,7 @@ scrape_configs:
scrape_interval: 50s scrape_interval: 50s
scrape_timeout: 5s scrape_timeout: 5s
body_size_limit: 10MB
sample_limit: 1000 sample_limit: 1000
metrics_path: /my_path metrics_path: /my_path

View file

@ -0,0 +1,3 @@
scrape_configs:
- job_name: prometheus
body_size_limit: 100

View file

@ -287,6 +287,9 @@ relabel_configs:
metric_relabel_configs: metric_relabel_configs:
[ - <relabel_config> ... ] [ - <relabel_config> ... ]
# An uncompressed response body larger than this many bytes will cause the
# scrape to fail. 0 means no limit. Example: 100MB.
[ body_size_limit: <string> | default = 0 ]
# Per-scrape limit on number of scraped samples that will be accepted. # Per-scrape limit on number of scraped samples that will be accepted.
# If more than this number of samples are present after metric relabeling # If more than this number of samples are present after metric relabeling
# the entire scrape will be treated as failed. 0 means no limit. # the entire scrape will be treated as failed. 0 means no limit.

View file

@ -54,11 +54,13 @@ local template = grafana.template;
.addPanel( .addPanel(
g.panel('Scrape failures') + g.panel('Scrape failures') +
g.queryPanel([ g.queryPanel([
'sum by (job) (rate(prometheus_target_scrapes_exceeded_body_size_limit_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_exceeded_sample_limit_total[1m]))', 'sum by (job) (rate(prometheus_target_scrapes_exceeded_sample_limit_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_sample_duplicate_timestamp_total[1m]))', 'sum by (job) (rate(prometheus_target_scrapes_sample_duplicate_timestamp_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_sample_out_of_bounds_total[1m]))', 'sum by (job) (rate(prometheus_target_scrapes_sample_out_of_bounds_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_sample_out_of_order_total[1m]))', 'sum by (job) (rate(prometheus_target_scrapes_sample_out_of_order_total[1m]))',
], [ ], [
'exceeded body size limit: {{job}}',
'exceeded sample limit: {{job}}', 'exceeded sample limit: {{job}}',
'duplicate timestamp: {{job}}', 'duplicate timestamp: {{job}}',
'out of bounds: {{job}}', 'out of bounds: {{job}}',

View file

@ -134,6 +134,12 @@ var (
}, },
[]string{"scrape_job"}, []string{"scrape_job"},
) )
targetScrapeExceededBodySizeLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_exceeded_body_size_limit_total",
Help: "Total number of scrapes that hit the body size limit",
},
)
targetScrapeSampleLimit = prometheus.NewCounter( targetScrapeSampleLimit = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "prometheus_target_scrapes_exceeded_sample_limit_total", Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
@ -195,6 +201,7 @@ func init() {
targetScrapePoolReloadsFailed, targetScrapePoolReloadsFailed,
targetSyncIntervalLength, targetSyncIntervalLength,
targetScrapePoolSyncsCounter, targetScrapePoolSyncsCounter,
targetScrapeExceededBodySizeLimit,
targetScrapeSampleLimit, targetScrapeSampleLimit,
targetScrapeSampleDuplicate, targetScrapeSampleDuplicate,
targetScrapeSampleOutOfOrder, targetScrapeSampleOutOfOrder,
@ -381,11 +388,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout) timeout = time.Duration(sp.config.ScrapeTimeout)
sampleLimit = int(sp.config.SampleLimit) bodySizeLimit = int64(sp.config.BodySizeLimit)
labelLimits = &labelLimits{ sampleLimit = int(sp.config.SampleLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit), labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
@ -408,7 +416,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
} }
var ( var (
t = sp.activeTargets[fp] t = sp.activeTargets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout} s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
newLoop = sp.newLoop(scrapeLoopOptions{ newLoop = sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
@ -481,11 +489,12 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// It returns after all stopped scrape loops terminated. // It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) { func (sp *scrapePool) sync(targets []*Target) {
var ( var (
uniqueLoops = make(map[uint64]loop) uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout) timeout = time.Duration(sp.config.ScrapeTimeout)
sampleLimit = int(sp.config.SampleLimit) bodySizeLimit = int64(sp.config.BodySizeLimit)
labelLimits = &labelLimits{ sampleLimit = int(sp.config.SampleLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit), labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
@ -500,7 +509,7 @@ func (sp *scrapePool) sync(targets []*Target) {
hash := t.hash() hash := t.hash()
if _, ok := sp.activeTargets[hash]; !ok { if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout} s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
l := sp.newLoop(scrapeLoopOptions{ l := sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
@ -690,8 +699,12 @@ type targetScraper struct {
gzipr *gzip.Reader gzipr *gzip.Reader
buf *bufio.Reader buf *bufio.Reader
bodySizeLimit int64
} }
var errBodySizeLimit = errors.New("body size limit exceeded")
const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version) var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
@ -723,11 +736,18 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
return "", errors.Errorf("server returned HTTP status %s", resp.Status) return "", errors.Errorf("server returned HTTP status %s", resp.Status)
} }
if s.bodySizeLimit <= 0 {
s.bodySizeLimit = math.MaxInt64
}
if resp.Header.Get("Content-Encoding") != "gzip" { if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body) n, err := io.Copy(w, io.LimitReader(resp.Body, s.bodySizeLimit))
if err != nil { if err != nil {
return "", err return "", err
} }
if n >= s.bodySizeLimit {
targetScrapeExceededBodySizeLimit.Inc()
return "", errBodySizeLimit
}
return resp.Header.Get("Content-Type"), nil return resp.Header.Get("Content-Type"), nil
} }
@ -744,11 +764,15 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
} }
} }
_, err = io.Copy(w, s.gzipr) n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit))
s.gzipr.Close() s.gzipr.Close()
if err != nil { if err != nil {
return "", err return "", err
} }
if n >= s.bodySizeLimit {
targetScrapeExceededBodySizeLimit.Inc()
return "", errBodySizeLimit
}
return resp.Header.Get("Content-Type"), nil return resp.Header.Get("Content-Type"), nil
} }

View file

@ -15,6 +15,7 @@ package scrape
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -1950,6 +1951,69 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err) require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
} }
func TestTargetScraperBodySizeLimit(t *testing.T) {
const (
bodySizeLimit = 15
responseBody = "metric_a 1\nmetric_b 2\n"
)
var gzipResponse bool
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
if gzipResponse {
w.Header().Set("Content-Encoding", "gzip")
gw := gzip.NewWriter(w)
defer gw.Close()
gw.Write([]byte(responseBody))
return
}
w.Write([]byte(responseBody))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
if err != nil {
panic(err)
}
ts := &targetScraper{
Target: &Target{
labels: labels.FromStrings(
model.SchemeLabel, serverURL.Scheme,
model.AddressLabel, serverURL.Host,
),
},
client: http.DefaultClient,
bodySizeLimit: bodySizeLimit,
}
var buf bytes.Buffer
// Target response uncompressed body, scrape with body size limit.
_, err = ts.scrape(context.Background(), &buf)
require.ErrorIs(t, err, errBodySizeLimit)
require.Equal(t, bodySizeLimit, buf.Len())
// Target response gzip compressed body, scrape with body size limit.
gzipResponse = true
buf.Reset()
_, err = ts.scrape(context.Background(), &buf)
require.ErrorIs(t, err, errBodySizeLimit)
require.Equal(t, bodySizeLimit, buf.Len())
// Target response uncompressed body, scrape without body size limit.
gzipResponse = false
buf.Reset()
ts.bodySizeLimit = 0
_, err = ts.scrape(context.Background(), &buf)
require.NoError(t, err)
require.Equal(t, len(responseBody), buf.Len())
// Target response gzip compressed body, scrape without body size limit.
gzipResponse = true
buf.Reset()
_, err = ts.scrape(context.Background(), &buf)
require.NoError(t, err)
require.Equal(t, len(responseBody), buf.Len())
}
// testScraper implements the scraper interface and allows setting values // testScraper implements the scraper interface and allows setting values
// returned by its methods. It also allows setting a custom scrape function. // returned by its methods. It also allows setting a custom scrape function.
type testScraper struct { type testScraper struct {