mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Scraping: share buffer pool across all scrapes
Previously we had one per scrapePool, and one of those per configured scraping job. Each pool holds a few unused buffers, so sharing one across all scrapePools reduces total heap memory. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
2329fba0e5
commit
9051100aba
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/osutil"
|
||||
"github.com/prometheus/prometheus/util/pool"
|
||||
)
|
||||
|
||||
// NewManager is the Manager constructor.
|
||||
|
@ -57,6 +58,7 @@ func NewManager(o *Options, logger log.Logger, app storage.Appendable, registere
|
|||
graceShut: make(chan struct{}),
|
||||
triggerReload: make(chan struct{}, 1),
|
||||
metrics: sm,
|
||||
buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }),
|
||||
}
|
||||
|
||||
m.metrics.setTargetMetadataCacheGatherer(m)
|
||||
|
@ -94,6 +96,7 @@ type Manager struct {
|
|||
scrapeConfigs map[string]*config.ScrapeConfig
|
||||
scrapePools map[string]*scrapePool
|
||||
targetSets map[string][]*targetgroup.Group
|
||||
buffers *pool.Pool
|
||||
|
||||
triggerReload chan struct{}
|
||||
|
||||
|
@ -156,7 +159,7 @@ func (m *Manager) reload() {
|
|||
continue
|
||||
}
|
||||
m.metrics.targetScrapePools.Inc()
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.opts, m.metrics)
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
||||
if err != nil {
|
||||
m.metrics.targetScrapePoolsFailed.Inc()
|
||||
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
|
||||
|
|
|
@ -117,7 +117,7 @@ const maxAheadTime = 10 * time.Minute
|
|||
// returning an empty label set is interpreted as "drop".
|
||||
type labelsMutator func(labels.Labels) labels.Labels
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger log.Logger, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger log.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
|
@ -127,8 +127,6 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||
return nil, fmt.Errorf("error creating HTTP client: %w", err)
|
||||
}
|
||||
|
||||
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sp := &scrapePool{
|
||||
cancel: cancel,
|
||||
|
|
|
@ -49,6 +49,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/util/pool"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
@ -68,7 +69,7 @@ func TestNewScrapePool(t *testing.T) {
|
|||
var (
|
||||
app = &nopAppendable{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
)
|
||||
|
||||
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
|
||||
|
@ -104,7 +105,7 @@ func TestDroppedTargetsList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
|
||||
expectedLength = 2
|
||||
)
|
||||
|
@ -504,7 +505,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
|
|||
func TestScrapePoolAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{}
|
||||
app := &nopAppendable{}
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
|
||||
loop := sp.newLoop(scrapeLoopOptions{
|
||||
target: &Target{},
|
||||
|
@ -560,7 +561,7 @@ func TestScrapePoolRaces(t *testing.T) {
|
|||
newConfig := func() *config.ScrapeConfig {
|
||||
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
|
||||
}
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{
|
||||
|
@ -3277,7 +3278,7 @@ func TestReuseScrapeCache(t *testing.T) {
|
|||
ScrapeInterval: model.Duration(5 * time.Second),
|
||||
MetricsPath: "/metrics",
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
t1 = &Target{
|
||||
discoveredLabels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
|
||||
}
|
||||
|
@ -3481,8 +3482,9 @@ func TestReuseCacheRace(t *testing.T) {
|
|||
ScrapeInterval: model.Duration(5 * time.Second),
|
||||
MetricsPath: "/metrics",
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
t1 = &Target{
|
||||
buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
|
||||
t1 = &Target{
|
||||
discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
|
||||
}
|
||||
)
|
||||
|
@ -3612,7 +3614,7 @@ func TestScrapeReportLimit(t *testing.T) {
|
|||
}))
|
||||
defer ts.Close()
|
||||
|
||||
sp, err := newScrapePool(cfg, s, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -3786,7 +3788,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
|
||||
|
@ -3877,7 +3879,7 @@ test_summary_count 199
|
|||
}))
|
||||
defer ts.Close()
|
||||
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -4029,7 +4031,7 @@ func TestScrapeLoopCompression(t *testing.T) {
|
|||
EnableCompression: tc.enableCompression,
|
||||
}
|
||||
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
|
Loading…
Reference in a new issue