Merge pull request #12992 from bboreham/single-scrape-buffer-pool

Scraping: share buffer pool across all scrapes
This commit is contained in:
Bryan Boreham 2023-11-24 16:26:19 +00:00 committed by GitHub
commit 784a2d2c74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 15 deletions

View file

@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/osutil"
"github.com/prometheus/prometheus/util/pool"
)
// NewManager is the Manager constructor.
@ -57,6 +58,7 @@ func NewManager(o *Options, logger log.Logger, app storage.Appendable, registere
graceShut: make(chan struct{}),
triggerReload: make(chan struct{}, 1),
metrics: sm,
buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }),
}
m.metrics.setTargetMetadataCacheGatherer(m)
@ -94,6 +96,7 @@ type Manager struct {
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group
buffers *pool.Pool
triggerReload chan struct{}
@ -156,7 +159,7 @@ func (m *Manager) reload() {
continue
}
m.metrics.targetScrapePools.Inc()
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.opts, m.metrics)
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, log.With(m.logger, "scrape_pool", setName), m.buffers, m.opts, m.metrics)
if err != nil {
m.metrics.targetScrapePoolsFailed.Inc()
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)

View file

@ -117,7 +117,7 @@ const maxAheadTime = 10 * time.Minute
// returning an empty label set is interpreted as "drop".
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger log.Logger, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger log.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
if logger == nil {
logger = log.NewNopLogger()
}
@ -127,8 +127,6 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
return nil, fmt.Errorf("error creating HTTP client: %w", err)
}
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,

View file

@ -49,6 +49,7 @@ import (
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/pool"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
)
@ -68,7 +69,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
)
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -104,7 +105,7 @@ func TestDroppedTargetsList(t *testing.T) {
},
},
}
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
expectedLength = 2
)
@ -504,7 +505,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp, _ := newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
loop := sp.newLoop(scrapeLoopOptions{
target: &Target{},
@ -560,7 +561,7 @@ func TestScrapePoolRaces(t *testing.T) {
newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
}
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{
@ -3279,7 +3280,7 @@ func TestReuseScrapeCache(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
}
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
discoveredLabels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
}
@ -3483,8 +3484,9 @@ func TestReuseCacheRace(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
}
sp, _ = newScrapePool(cfg, app, 0, nil, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
}
)
@ -3614,7 +3616,7 @@ func TestScrapeReportLimit(t *testing.T) {
}))
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -3788,7 +3790,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
},
},
}
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
@ -3879,7 +3881,7 @@ test_summary_count 199
}))
defer ts.Close()
sp, err := newScrapePool(config, simpleStorage, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4031,7 +4033,7 @@ func TestScrapeLoopCompression(t *testing.T) {
EnableCompression: tc.enableCompression,
}
sp, err := newScrapePool(config, simpleStorage, 0, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()