scraping: delay creating buffer, to save memory (#12953)

We don't need the buffer to read the response until the scrape http call
returns; creating it earlier makes the buffer pool larger.

I split `scrape()` into `scrape()` which returns with the http response,
and `readResponse()` which decompresses and copies the data into the
supplied buffer. This design was chosen to minimize impact on the logic.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2023-10-09 17:23:53 +01:00 committed by GitHub
parent fe90dcccff
commit f6d9c84fde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 25 deletions

View file

@ -785,7 +785,8 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Append
// A scraper retrieves samples and accepts a status report at the end. // A scraper retrieves samples and accepts a status report at the end.
type scraper interface { type scraper interface {
scrape(ctx context.Context, w io.Writer) (string, error) scrape(ctx context.Context) (*http.Response, error)
readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error)
Report(start time.Time, dur time.Duration, err error) Report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration, offsetSeed uint64) time.Duration offset(interval time.Duration, offsetSeed uint64) time.Duration
} }
@ -814,11 +815,11 @@ const (
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) { func (s *targetScraper) scrape(ctx context.Context) (*http.Response, error) {
if s.req == nil { if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil) req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil { if err != nil {
return "", err return nil, err
} }
req.Header.Add("Accept", s.acceptHeader) req.Header.Add("Accept", s.acceptHeader)
req.Header.Add("Accept-Encoding", "gzip") req.Header.Add("Accept-Encoding", "gzip")
@ -828,10 +829,10 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
s.req = req s.req = req
} }
resp, err := s.client.Do(s.req.WithContext(ctx)) return s.client.Do(s.req.WithContext(ctx))
if err != nil { }
return "", err
} func (s *targetScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) {
defer func() { defer func() {
io.Copy(io.Discard, resp.Body) io.Copy(io.Discard, resp.Body)
resp.Body.Close() resp.Body.Close()
@ -858,13 +859,14 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
if s.gzipr == nil { if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body) s.buf = bufio.NewReader(resp.Body)
var err error
s.gzipr, err = gzip.NewReader(s.buf) s.gzipr, err = gzip.NewReader(s.buf)
if err != nil { if err != nil {
return "", err return "", err
} }
} else { } else {
s.buf.Reset(resp.Body) s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil { if err := s.gzipr.Reset(s.buf); err != nil {
return "", err return "", err
} }
} }
@ -1326,11 +1328,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
) )
} }
b := sl.buffers.Get(sl.lastScrapeSize).([]byte) var total, added, seriesAdded, bytesRead int
defer sl.buffers.Put(b)
buf := bytes.NewBuffer(b)
var total, added, seriesAdded, bytes int
var err, appErr, scrapeErr error var err, appErr, scrapeErr error
app := sl.appender(sl.appenderCtx) app := sl.appender(sl.appenderCtx)
@ -1346,7 +1344,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
}() }()
defer func() { defer func() {
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytes, scrapeErr); err != nil { if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
} }
}() }()
@ -1367,8 +1365,17 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
} }
var contentType string var contentType string
var resp *http.Response
var b []byte
var buf *bytes.Buffer
scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout) scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout)
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf) resp, scrapeErr = sl.scraper.scrape(scrapeCtx)
if scrapeErr == nil {
b = sl.buffers.Get(sl.lastScrapeSize).([]byte)
defer sl.buffers.Put(b)
buf = bytes.NewBuffer(b)
contentType, scrapeErr = sl.scraper.readResponse(scrapeCtx, resp, buf)
}
cancel() cancel()
if scrapeErr == nil { if scrapeErr == nil {
@ -1379,14 +1386,14 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
if len(b) > 0 { if len(b) > 0 {
sl.lastScrapeSize = len(b) sl.lastScrapeSize = len(b)
} }
bytes = len(b) bytesRead = len(b)
} else { } else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr) level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr)
if errc != nil { if errc != nil {
errc <- scrapeErr errc <- scrapeErr
} }
if errors.Is(scrapeErr, errBodySizeLimit) { if errors.Is(scrapeErr, errBodySizeLimit) {
bytes = -1 bytesRead = -1
} }
} }

View file

@ -2619,7 +2619,9 @@ func TestTargetScraperScrapeOK(t *testing.T) {
} }
var buf bytes.Buffer var buf bytes.Buffer
contentType, err := ts.scrape(context.Background(), &buf) resp, err := ts.scrape(context.Background())
require.NoError(t, err)
contentType, err := ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "text/plain; version=0.0.4", contentType) require.Equal(t, "text/plain; version=0.0.4", contentType)
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
@ -2665,7 +2667,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
}() }()
go func() { go func() {
_, err := ts.scrape(ctx, io.Discard) _, err := ts.scrape(ctx)
switch { switch {
case err == nil: case err == nil:
errc <- errors.New("Expected error but got nil") errc <- errors.New("Expected error but got nil")
@ -2711,7 +2713,9 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
acceptHeader: scrapeAcceptHeader, acceptHeader: scrapeAcceptHeader,
} }
_, err = ts.scrape(context.Background(), io.Discard) resp, err := ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, io.Discard)
require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err) require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
} }
@ -2755,26 +2759,34 @@ func TestTargetScraperBodySizeLimit(t *testing.T) {
var buf bytes.Buffer var buf bytes.Buffer
// Target response uncompressed body, scrape with body size limit. // Target response uncompressed body, scrape with body size limit.
_, err = ts.scrape(context.Background(), &buf) resp, err := ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.ErrorIs(t, err, errBodySizeLimit) require.ErrorIs(t, err, errBodySizeLimit)
require.Equal(t, bodySizeLimit, buf.Len()) require.Equal(t, bodySizeLimit, buf.Len())
// Target response gzip compressed body, scrape with body size limit. // Target response gzip compressed body, scrape with body size limit.
gzipResponse = true gzipResponse = true
buf.Reset() buf.Reset()
_, err = ts.scrape(context.Background(), &buf) resp, err = ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.ErrorIs(t, err, errBodySizeLimit) require.ErrorIs(t, err, errBodySizeLimit)
require.Equal(t, bodySizeLimit, buf.Len()) require.Equal(t, bodySizeLimit, buf.Len())
// Target response uncompressed body, scrape without body size limit. // Target response uncompressed body, scrape without body size limit.
gzipResponse = false gzipResponse = false
buf.Reset() buf.Reset()
ts.bodySizeLimit = 0 ts.bodySizeLimit = 0
_, err = ts.scrape(context.Background(), &buf) resp, err = ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(responseBody), buf.Len()) require.Equal(t, len(responseBody), buf.Len())
// Target response gzip compressed body, scrape without body size limit. // Target response gzip compressed body, scrape without body size limit.
gzipResponse = true gzipResponse = true
buf.Reset() buf.Reset()
_, err = ts.scrape(context.Background(), &buf) resp, err = ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(responseBody), buf.Len()) require.Equal(t, len(responseBody), buf.Len())
} }
@ -2802,7 +2814,11 @@ func (ts *testScraper) Report(start time.Time, duration time.Duration, err error
ts.lastError = err ts.lastError = err
} }
func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error) { func (ts *testScraper) scrape(ctx context.Context) (*http.Response, error) {
return nil, ts.scrapeErr
}
func (ts *testScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) {
if ts.scrapeFunc != nil { if ts.scrapeFunc != nil {
return "", ts.scrapeFunc(ctx, w) return "", ts.scrapeFunc(ctx, w)
} }