diff --git a/scrape/manager.go b/scrape/manager.go index 0e606b4548..dc85f17bb5 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -124,6 +124,9 @@ func NewManager(o *Options, logger log.Logger, app storage.Appendable) *Manager // Options are the configuration parameters to the scrape manager. type Options struct { ExtraMetrics bool + // Option used by downstream scraper users like OpenTelemetry Collector + // to help lookup metric metadata. Should be false for Prometheus. + PassMetadataInContext bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption @@ -195,7 +198,7 @@ func (m *Manager) reload() { level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName) continue } - sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics, m.opts.HTTPClientOptions) + sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics, m.opts.PassMetadataInContext, m.opts.HTTPClientOptions) if err != nil { level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName) continue diff --git a/scrape/scrape.go b/scrape/scrape.go index 20633a1c51..2febb4390a 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -264,7 +264,7 @@ const maxAheadTime = 10 * time.Minute type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics bool, httpOpts []config_util.HTTPClientOption) (*scrapePool, error) { +func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics, passMetadataInContext bool, httpOpts []config_util.HTTPClientOption) (*scrapePool, error) { targetScrapePools.Inc() if logger == nil { logger = log.NewNopLogger() @@ -315,6 +315,9 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed opts.interval, opts.timeout, reportExtraMetrics, + opts.target, + cache, + passMetadataInContext, ) } @@ -855,10 +858,11 @@ type scrapeLoop struct { sampleMutator labelsMutator reportSampleMutator labelsMutator - parentCtx context.Context - ctx context.Context - cancel func() - stopped chan struct{} + parentCtx context.Context + appenderCtx context.Context + ctx context.Context + cancel func() + stopped chan struct{} disabledEndOfRunStalenessMarkers bool @@ -1124,6 +1128,9 @@ func newScrapeLoop(ctx context.Context, interval time.Duration, timeout time.Duration, reportExtraMetrics bool, + target *Target, + metricMetadataStore MetricMetadataStore, + passMetadataInContext bool, ) *scrapeLoop { if l == nil { l = log.NewNopLogger() @@ -1134,6 +1141,18 @@ func newScrapeLoop(ctx context.Context, if cache == nil { cache = newScrapeCache() } + + appenderCtx := ctx + + if passMetadataInContext { + // Store the cache and target in the context. This is then used by downstream OTel Collector + // to lookup the metadata required to process the samples. Not used by Prometheus itself. + // TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory + // leak. We should ideally fix the main leak. See: https://github.com/prometheus/prometheus/pull/10590 + appenderCtx = ContextWithMetricMetadataStore(appenderCtx, cache) + appenderCtx = ContextWithTarget(appenderCtx, target) + } + sl := &scrapeLoop{ scraper: sc, buffers: buffers, @@ -1145,6 +1164,7 @@ func newScrapeLoop(ctx context.Context, jitterSeed: jitterSeed, l: l, parentCtx: ctx, + appenderCtx: appenderCtx, honorTimestamps: honorTimestamps, sampleLimit: sampleLimit, labelLimits: labelLimits, @@ -1223,7 +1243,7 @@ mainLoop: // scrapeAndReport performs a scrape and then appends the result to the storage // together with reporting metrics, by using as few appenders as possible. // In the happy scenario, a single appender is used. -// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should +// This function uses sl.appenderCtx instead of sl.ctx on purpose. A scrape should // only be cancelled on shutdown, not on reloads. func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- error) time.Time { start := time.Now() @@ -1242,7 +1262,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er var total, added, seriesAdded, bytes int var err, appErr, scrapeErr error - app := sl.appender(sl.parentCtx) + app := sl.appender(sl.appenderCtx) defer func() { if err != nil { app.Rollback() @@ -1265,7 +1285,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er // Add stale markers. if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { app.Rollback() - app = sl.appender(sl.parentCtx) + app = sl.appender(sl.appenderCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) } if errc != nil { @@ -1304,13 +1324,13 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime) if appErr != nil { app.Rollback() - app = sl.appender(sl.parentCtx) + app = sl.appender(sl.appenderCtx) level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { app.Rollback() - app = sl.appender(sl.parentCtx) + app = sl.appender(sl.appenderCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) } } @@ -1374,8 +1394,8 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. - // sl.context would have been cancelled, hence using sl.parentCtx. - app := sl.appender(sl.parentCtx) + // sl.context would have been cancelled, hence using sl.appenderCtx. + app := sl.appender(sl.appenderCtx) var err error defer func() { if err != nil { @@ -1389,7 +1409,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int }() if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil { app.Rollback() - app = sl.appender(sl.parentCtx) + app = sl.appender(sl.appenderCtx) level.Warn(sl.l).Log("msg", "Stale append failed", "err", err) } if err = sl.reportStale(app, staleTime); err != nil { @@ -1791,3 +1811,31 @@ func reusableCache(r, l *config.ScrapeConfig) bool { } return reflect.DeepEqual(zeroConfig(r), zeroConfig(l)) } + +// CtxKey is a dedicated type for keys of context-embedded values propagated +// with the scrape context. +type ctxKey int + +// Valid CtxKey values. +const ( + ctxKeyMetadata ctxKey = iota + 1 + ctxKeyTarget +) + +func ContextWithMetricMetadataStore(ctx context.Context, s MetricMetadataStore) context.Context { + return context.WithValue(ctx, ctxKeyMetadata, s) +} + +func MetricMetadataStoreFromContext(ctx context.Context) (MetricMetadataStore, bool) { + s, ok := ctx.Value(ctxKeyMetadata).(MetricMetadataStore) + return s, ok +} + +func ContextWithTarget(ctx context.Context, t *Target) context.Context { + return context.WithValue(ctx, ctxKeyTarget, t) +} + +func TargetFromContext(ctx context.Context) (*Target, bool) { + t, ok := ctx.Value(ctxKeyTarget).(*Target) + return t, ok +} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index c771ee1c84..43200d4400 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -56,7 +56,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) + sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil) ) if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { @@ -91,7 +91,7 @@ func TestDroppedTargetsList(t *testing.T) { }, }, } - sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) + sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil) expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}" expectedLength = 1 ) @@ -454,7 +454,7 @@ func TestScrapePoolTargetLimit(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp, _ := newScrapePool(cfg, app, 0, nil, false, nil) + sp, _ := newScrapePool(cfg, app, 0, nil, false, false, nil) loop := sp.newLoop(scrapeLoopOptions{ target: &Target{}, @@ -496,7 +496,7 @@ func TestScrapePoolRaces(t *testing.T) { newConfig := func() *config.ScrapeConfig { return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} } - sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false, nil) + sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false, false, nil) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{ @@ -590,6 +590,9 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { 1, 0, false, + nil, + nil, + false, ) // The scrape pool synchronizes on stopping scrape loops. However, new scrape @@ -659,6 +662,9 @@ func TestScrapeLoopStop(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) // Terminate loop after 2 scrapes. @@ -731,6 +737,9 @@ func TestScrapeLoopRun(t *testing.T) { time.Second, time.Hour, false, + nil, + nil, + false, ) // The loop must terminate during the initial offset if the context @@ -783,6 +792,9 @@ func TestScrapeLoopRun(t *testing.T) { time.Second, 100*time.Millisecond, false, + nil, + nil, + false, ) go func() { @@ -839,6 +851,9 @@ func TestScrapeLoopForcedErr(t *testing.T) { time.Second, time.Hour, false, + nil, + nil, + false, ) forcedErr := fmt.Errorf("forced err") @@ -894,6 +909,9 @@ func TestScrapeLoopMetadata(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) defer cancel() @@ -948,6 +966,9 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) { 0, 0, false, + nil, + nil, + false, ) t.Cleanup(func() { cancel() }) @@ -1038,6 +1059,9 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) // Succeed once, several failures, then stop. numScrapes := 0 @@ -1097,6 +1121,9 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) // Succeed once, several failures, then stop. @@ -1160,6 +1187,9 @@ func TestScrapeLoopCache(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) numScrapes := 0 @@ -1239,6 +1269,9 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) numScrapes := 0 @@ -1350,6 +1383,9 @@ func TestScrapeLoopAppend(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -1437,7 +1473,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) { return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil) }, nil, - func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, nil, nil, false, ) slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)) @@ -1473,6 +1509,9 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) fakeRef := storage.SeriesRef(1) @@ -1528,6 +1567,9 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) // Get the value of the Counter before performing the append. @@ -1602,6 +1644,9 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -1647,6 +1692,9 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -1695,6 +1743,9 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -1803,6 +1854,9 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -1865,6 +1919,9 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -1914,6 +1971,9 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -1947,6 +2007,9 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -1993,6 +2056,9 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T 0, 0, false, + nil, + nil, + false, ) now := time.Unix(1, 0) @@ -2035,6 +2101,9 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now().Add(20 * time.Minute) @@ -2289,6 +2358,9 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -2327,6 +2399,9 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) now := time.Now() @@ -2364,6 +2439,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) defer cancel() @@ -2419,6 +2497,9 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) defer cancel() @@ -2511,7 +2592,7 @@ func TestReuseScrapeCache(t *testing.T) { ScrapeInterval: model.Duration(5 * time.Second), MetricsPath: "/metrics", } - sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) + sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil) t1 = &Target{ discoveredLabels: labels.Labels{ labels.Label{ @@ -2692,6 +2773,9 @@ func TestScrapeAddFast(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) defer cancel() @@ -2721,7 +2805,7 @@ func TestReuseCacheRace(t *testing.T) { ScrapeInterval: model.Duration(5 * time.Second), MetricsPath: "/metrics", } - sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) + sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil) t1 = &Target{ discoveredLabels: labels.Labels{ labels.Label{ @@ -2780,6 +2864,9 @@ func TestScrapeReportSingleAppender(t *testing.T) { 10*time.Millisecond, time.Hour, false, + nil, + nil, + false, ) numScrapes := 0 @@ -2850,7 +2937,7 @@ func TestScrapeReportLimit(t *testing.T) { })) defer ts.Close() - sp, err := newScrapePool(cfg, s, 0, nil, false, nil) + sp, err := newScrapePool(cfg, s, 0, nil, false, false, nil) require.NoError(t, err) defer sp.stop() @@ -2979,6 +3066,9 @@ func TestScrapeLoopLabelLimit(t *testing.T) { 0, 0, false, + nil, + nil, + false, ) slApp := sl.appender(context.Background()) @@ -3017,7 +3107,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) { }, }, } - sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false, nil) + sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false, false, nil) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},