From d6f6ad67b389ed135d8946fa923b0a5f7bcba4d0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 25 Aug 2017 12:36:43 +0200 Subject: [PATCH 1/2] pkg/pool: create bucketed memory pool package. --- pkg/pool/pool.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 pkg/pool/pool.go diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go new file mode 100644 index 000000000..7cfa78f42 --- /dev/null +++ b/pkg/pool/pool.go @@ -0,0 +1,75 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import "sync" + +// BytesPool is a bucketed pool for variably sized byte slices. +type BytesPool struct { + buckets []sync.Pool + sizes []int +} + +// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize +// increasing by the given factor. +func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { + if minSize < 1 { + panic("invalid minimum pool size") + } + if maxSize < 1 { + panic("invalid maximum pool size") + } + if factor < 1 { + panic("invalid factor") + } + + var sizes []int + + for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + sizes = append(sizes, s) + } + + p := &BytesPool{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + } + + return p +} + +// Get returns a new byte slices that fits the given size. +func (p *BytesPool) Get(sz int) []byte { + for i, bktSize := range p.sizes { + if sz > bktSize { + continue + } + b, ok := p.buckets[i].Get().([]byte) + if !ok { + b = make([]byte, 0, bktSize) + } + return b + } + return make([]byte, 0, sz) +} + +// Put returns a byte slice to the right bucket in the pool. +func (p *BytesPool) Put(b []byte) { + for i, bktSize := range p.sizes { + if cap(b) > bktSize { + continue + } + p.buckets[i].Put(b[:0]) + return + } +} From 5bed8af4cb6e74d1b9885d8de26959e9e25de861 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 7 Sep 2017 14:43:21 +0200 Subject: [PATCH 2/2] retrieval: pool scrape buffers This adds a bucketed buffer pool to the scrapers so we don't have to allocate a new buffer on each scrape or hold it fixed to the scrape loop. The latter can consume significant amounts of unused memory, e.g. 4GB when scraping 2MB /metrics from 2000 targets. --- retrieval/scrape.go | 31 ++++++++++++++++++++++++++----- retrieval/scrape_test.go | 23 ++++++++++++++--------- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 2135e0268..f23cea97a 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/pool" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -145,13 +146,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) } + buffers := pool.NewBytesPool(163, 100e6, 3) + newLoop := func( ctx context.Context, s scraper, app, reportApp func() storage.Appender, l log.Logger, ) loop { - return newScrapeLoop(ctx, s, app, reportApp, l) + return newScrapeLoop(ctx, s, app, reportApp, buffers, l) } return &scrapePool{ @@ -470,9 +473,11 @@ type refEntry struct { } type scrapeLoop struct { - scraper scraper - l log.Logger - cache *scrapeCache + scraper scraper + l log.Logger + cache *scrapeCache + lastScrapeSize int + buffers *pool.BytesPool appender func() storage.Appender reportAppender func() storage.Appender @@ -573,16 +578,22 @@ func newScrapeLoop( ctx context.Context, sc scraper, app, reportApp func() storage.Appender, + buffers *pool.BytesPool, l log.Logger, ) *scrapeLoop { if l == nil { l = log.Base() } + if buffers == nil { + buffers = pool.NewBytesPool(10e3, 100e6, 3) + } sl := &scrapeLoop{ scraper: sc, appender: app, cache: newScrapeCache(), reportAppender: reportApp, + buffers: buffers, + lastScrapeSize: 16000, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -631,12 +642,20 @@ mainLoop: time.Since(last).Seconds(), ) } + b := sl.buffers.Get(sl.lastScrapeSize) + buf := bytes.NewBuffer(b) scrapeErr := sl.scraper.scrape(scrapeCtx, buf) cancel() - var b []byte + if scrapeErr == nil { b = buf.Bytes() + // NOTE: There were issues with misbehaving clients in the past + // that occasionally returned empty results. We don't want those + // to falsely reset our buffer size. + if len(b) > 0 { + sl.lastScrapeSize = len(b) + } } else { sl.l.With("err", scrapeErr.Error()).Debug("scrape failed") if errc != nil { @@ -656,6 +675,8 @@ mainLoop: } } + sl.buffers.Put(b) + if scrapeErr == nil { scrapeErr = appErr } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index a49277ff9..376c23cdd 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -313,7 +313,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, nil) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are started asynchronously. Thus it's possible, that a loop is stopped @@ -371,7 +371,7 @@ func TestScrapeLoopStop(t *testing.T) { ) defer close(signal) - sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil) + sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -428,7 +428,7 @@ func TestScrapeLoopRun(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) // The loop must terminate during the initial offset if the context // is canceled. @@ -466,7 +466,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl = newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) go func() { sl.run(time.Second, 100*time.Millisecond, errc) @@ -511,7 +511,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -561,7 +561,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -608,6 +608,7 @@ func TestScrapeLoopAppend(t *testing.T) { func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) @@ -645,6 +646,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now() @@ -688,6 +690,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now() @@ -780,7 +783,7 @@ func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil, nil) // Setup a series to be stale, then 3 samples, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -831,7 +834,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() @@ -853,7 +856,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) + sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() @@ -895,6 +898,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T func() storage.Appender { return app }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Unix(1, 0) @@ -925,6 +929,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { }, func() storage.Appender { return nopAppender{} }, nil, + nil, ) now := time.Now().Add(20 * time.Minute)