This commit is contained in:
Ridwan Sharif 2024-11-09 06:17:35 +08:00 committed by GitHub
commit 94e08d309d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 179 additions and 37 deletions

View file

@ -90,8 +90,23 @@ type Options struct {
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
// private option for testability.
skipOffsetting bool
// Option to allow a final scrape before the manager is shutdown. Useful
// for serverless flavours of OTel's prometheusreceiver which might require
// a final scrape of targets before the instance is shutdown.
ScrapeOnShutdown bool
// InitialScrapeOffset controls how long after startup we should scrape all
// targets. By default, all targets have an offset so we spread the
// scraping load evenly within the Prometheus server. Configuring this will
// make it so all targets have the same configured offset, which may be
// undesirable as load is no longer evenly spread. This is useful however
// in serverless deployments where we're sensitive to the initial offsets
// and would like them to be small and configurable.
//
// NOTE: This option is experimental and not used by Prometheus. It was
// created for serverless flavors of OpenTelemetry contrib's
// prometheusreceiver.
InitialScrapeOffset *time.Duration
}
// Manager maintains a set of scrape pools and manages start/stop cycles

View file

@ -725,11 +725,12 @@ scrape_configs:
}
func setupScrapeManager(t *testing.T, honorTimestamps, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
noOffset := time.Duration(0)
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
skipOffsetting: true,
InitialScrapeOffset: &noOffset,
},
promslog.New(&promslog.Config{}),
nil,
@ -978,12 +979,13 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
noOffset := time.Duration(0)
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
EnableNativeHistogramsIngestion: true,
skipOffsetting: true,
InitialScrapeOffset: &noOffset,
},
promslog.New(&promslog.Config{}),
nil,
@ -1509,3 +1511,115 @@ scrape_configs:
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
)
}
func TestManagerStopAfterScrapeAttempt(t *testing.T) {
noOffset := 0 * time.Nanosecond
largeOffset := 99 * time.Hour
oneSecondOffset := 1 * time.Second
tenSecondOffset := 10 * time.Second
interval := 10 * time.Second
for _, tcase := range []struct {
name string
scrapeOnShutdown bool
initialScrapeOffset *time.Duration
stopDelay time.Duration
expectedSamples int
}{
{
name: "no scrape on stop, with offset of 10s",
initialScrapeOffset: &tenSecondOffset,
stopDelay: 5 * time.Second,
expectedSamples: 0,
scrapeOnShutdown: false,
},
{
name: "no scrape on stop, no jitter",
initialScrapeOffset: &noOffset,
stopDelay: 5 * time.Second,
expectedSamples: 1,
scrapeOnShutdown: false,
},
{
name: "scrape on stop, no jitter",
initialScrapeOffset: &noOffset,
stopDelay: 5 * time.Second,
expectedSamples: 2,
scrapeOnShutdown: true,
},
{
name: "scrape on stop, with large offset",
initialScrapeOffset: &largeOffset,
stopDelay: 5 * time.Second,
expectedSamples: 1,
scrapeOnShutdown: true,
},
{
name: "scrape on stop after 5s, with offset of 1s",
initialScrapeOffset: &oneSecondOffset,
stopDelay: 5 * time.Second,
expectedSamples: 2,
scrapeOnShutdown: true,
},
{
name: "scrape on stop after 5s, with offset of 10s",
initialScrapeOffset: &tenSecondOffset,
stopDelay: 5 * time.Second,
expectedSamples: 1,
scrapeOnShutdown: true,
},
} {
t.Run(tcase.name, func(t *testing.T) {
app := &collectResultAppender{}
// Setup scrape manager.
scrapeManager, err := NewManager(
&Options{
ScrapeOnShutdown: tcase.scrapeOnShutdown,
InitialScrapeOffset: tcase.initialScrapeOffset,
},
promslog.New(&promslog.Config{}),
nil,
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
ScrapeTimeout: model.Duration(interval),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
// Start fake HTTP target to scrape returning a single metric.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("expected_metric 1\n"))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
},
},
})
scrapeManager.reload()
// Wait for the defined stop delay, before stopping.
time.Sleep(tcase.stopDelay)
scrapeManager.Stop()
// Verify results.
require.Len(t, findSamplesForMetric(app.resultFloats, "expected_metric"), tcase.expectedSamples)
})
}
}

View file

@ -92,6 +92,7 @@ type scrapePool struct {
scrapeFailureLogger *logging.JSONFileLogger
scrapeFailureLoggerMtx sync.RWMutex
scrapeOnShutdown bool
}
type labelLimits struct {
@ -150,6 +151,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
logger: logger,
metrics: metrics,
httpOpts: options.HTTPClientOptions,
scrapeOnShutdown: options.ScrapeOnShutdown,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
@ -190,9 +192,10 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.target,
options.PassMetadataInContext,
metrics,
options.skipOffsetting,
options.InitialScrapeOffset,
opts.validationScheme,
opts.fallbackScrapeProtocol,
sp.scrapeOnShutdown,
)
}
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
@ -248,7 +251,6 @@ func (sp *scrapePool) getScrapeFailureLogger() *logging.JSONFileLogger {
func (sp *scrapePool) stop() {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.cancel()
var wg sync.WaitGroup
sp.targetMtx.Lock()
@ -268,6 +270,7 @@ func (sp *scrapePool) stop() {
sp.targetMtx.Unlock()
wg.Wait()
sp.cancel()
sp.client.CloseIdleConnections()
if sp.config != nil {
@ -907,11 +910,12 @@ type scrapeLoop struct {
sampleMutator labelsMutator
reportSampleMutator labelsMutator
parentCtx context.Context
appenderCtx context.Context
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
appenderCtx context.Context
ctx context.Context
cancel func()
stopped chan struct{}
shutdownScrape chan struct{}
disabledEndOfRunStalenessMarkers bool
@ -920,7 +924,8 @@ type scrapeLoop struct {
metrics *scrapeMetrics
skipOffsetting bool // For testability.
scrapeOnShutdown bool
initialScrapeOffset *time.Duration
}
// scrapeCache tracks mappings of exposed metric strings to label sets and
@ -1204,9 +1209,10 @@ func newScrapeLoop(ctx context.Context,
target *Target,
passMetadataInContext bool,
metrics *scrapeMetrics,
skipOffsetting bool,
initialScrapeOffset *time.Duration,
validationScheme model.ValidationScheme,
fallbackScrapeProtocol string,
scrapeOnShutdown bool,
) *scrapeLoop {
if l == nil {
l = promslog.NewNopLogger()
@ -1238,6 +1244,7 @@ func newScrapeLoop(ctx context.Context,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
shutdownScrape: make(chan struct{}),
offsetSeed: offsetSeed,
l: l,
parentCtx: ctx,
@ -1258,9 +1265,10 @@ func newScrapeLoop(ctx context.Context,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
skipOffsetting: skipOffsetting,
initialScrapeOffset: initialScrapeOffset,
validationScheme: validationScheme,
fallbackScrapeProtocol: fallbackScrapeProtocol,
scrapeOnShutdown: scrapeOnShutdown,
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -1277,33 +1285,29 @@ func (sl *scrapeLoop) setScrapeFailureLogger(l *logging.JSONFileLogger) {
}
func (sl *scrapeLoop) run(errc chan<- error) {
if !sl.skipOffsetting {
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
}
jitterDelayTime := sl.scraper.offset(sl.interval, sl.offsetSeed)
if sl.initialScrapeOffset != nil {
jitterDelayTime = *sl.initialScrapeOffset
}
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.shutdownScrape:
sl.cancel()
case <-time.After(jitterDelayTime):
}
var last time.Time
alignedScrapeTime := time.Now().Round(0)
ticker := time.NewTicker(sl.interval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
default:
}
// Temporary workaround for a jitter in go timers that causes disk space
// increase in TSDB.
// See https://github.com/prometheus/prometheus/issues/7846
@ -1332,6 +1336,8 @@ mainLoop:
return
case <-sl.ctx.Done():
break mainLoop
case <-sl.shutdownScrape:
sl.cancel()
case <-ticker.C:
}
}
@ -1533,7 +1539,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// Stop the scraping. May still write data and stale markers after it has
// returned. Cancel the context to stop all writes.
func (sl *scrapeLoop) stop() {
sl.cancel()
if sl.scrapeOnShutdown {
sl.shutdownScrape <- struct{}{}
} else {
sl.cancel()
}
<-sl.stopped
}

View file

@ -781,14 +781,15 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
nil,
false,
newTestScrapeMetrics(t),
false,
nil,
model.LegacyValidation,
"text/plain",
false,
)
}
func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{}
scraper := &testScraper{offsetDur: time.Second}
sl := newBasicScrapeLoop(t, context.Background(), scraper, nil, 1)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
@ -926,9 +927,10 @@ func TestScrapeLoopRun(t *testing.T) {
nil,
false,
scrapeMetrics,
false,
nil,
model.LegacyValidation,
"text/plain",
false,
)
// The loop must terminate during the initial offset if the context
@ -1073,9 +1075,10 @@ func TestScrapeLoopMetadata(t *testing.T) {
nil,
false,
scrapeMetrics,
false,
nil,
model.LegacyValidation,
"text/plain",
false,
)
defer cancel()