diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 05f0d51b93..02725e353d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -180,6 +180,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "extra-scrape-metrics": c.scrape.ExtraMetrics = true level.Info(logger).Log("msg", "Experimental additional scrape metrics enabled") + case "metadata-storage": + c.scrape.EnableMetadataStorage = true + level.Info(logger).Log("msg", "Experimental in-memory metadata storage enabled") case "new-service-discovery-manager": c.enableNewSDManager = true level.Info(logger).Log("msg", "Experimental service discovery manager") diff --git a/scrape/manager.go b/scrape/manager.go index 81ee4ff635..d8a0ce72f9 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -129,6 +129,9 @@ type Options struct { // Option used by downstream scraper users like OpenTelemetry Collector // to help lookup metric metadata. Should be false for Prometheus. PassMetadataInContext bool + // Option to enable the experimental in-memory metadata storage and append + // metadata to the WAL. + EnableMetadataStorage bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration diff --git a/scrape/scrape.go b/scrape/scrape.go index 9a5750f78f..542db15f85 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/timestamp" @@ -318,6 +319,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed opts.interval, opts.timeout, options.ExtraMetrics, + options.EnableMetadataStorage, opts.target, cache, options.PassMetadataInContext, @@ -844,6 +846,7 @@ type cacheEntry struct { lastIter uint64 hash uint64 lset labels.Labels + meta metadata.Metadata } type scrapeLoop struct { @@ -873,7 +876,8 @@ type scrapeLoop struct { disabledEndOfRunStalenessMarkers bool - reportExtraMetrics bool + reportExtraMetrics bool + appendMetadataToWAL bool } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -906,15 +910,15 @@ type scrapeCache struct { // metaEntry holds meta information about a metric. type metaEntry struct { - lastIter uint64 // Last scrape iteration the entry was observed at. - typ textparse.MetricType - help string - unit string + metadata.Metadata + + lastIter uint64 // Last scrape iteration the entry was observed at. + lastIterChange uint64 // Last scrape iteration the entry was changed at. } func (m *metaEntry) size() int { // The attribute lastIter although part of the struct it is not metadata. - return len(m.help) + len(m.unit) + len(m.typ) + return len(m.Help) + len(m.Unit) + len(m.Type) } func newScrapeCache() *scrapeCache { @@ -988,11 +992,11 @@ func (c *scrapeCache) get(met string) (*cacheEntry, bool) { return e, true } -func (c *scrapeCache) addRef(met string, ref storage.SeriesRef, lset labels.Labels, hash uint64) { +func (c *scrapeCache) addRef(met string, ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata, hash uint64) { if ref == 0 { return } - c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, meta: meta, hash: hash} } func (c *scrapeCache) addDropped(met string) { @@ -1027,10 +1031,13 @@ func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) { e, ok := c.metadata[yoloString(metric)] if !ok { - e = &metaEntry{typ: textparse.MetricTypeUnknown} + e = &metaEntry{Metadata: metadata.Metadata{Type: textparse.MetricTypeUnknown}} c.metadata[string(metric)] = e } - e.typ = t + if e.Type != t { + e.Type = t + e.lastIterChange = c.iter + } e.lastIter = c.iter c.metaMtx.Unlock() @@ -1041,11 +1048,12 @@ func (c *scrapeCache) setHelp(metric, help []byte) { e, ok := c.metadata[yoloString(metric)] if !ok { - e = &metaEntry{typ: textparse.MetricTypeUnknown} + e = &metaEntry{Metadata: metadata.Metadata{Type: textparse.MetricTypeUnknown}} c.metadata[string(metric)] = e } - if e.help != yoloString(help) { - e.help = string(help) + if e.Help != yoloString(help) { + e.Help = string(help) + e.lastIterChange = c.iter } e.lastIter = c.iter @@ -1057,11 +1065,12 @@ func (c *scrapeCache) setUnit(metric, unit []byte) { e, ok := c.metadata[yoloString(metric)] if !ok { - e = &metaEntry{typ: textparse.MetricTypeUnknown} + e = &metaEntry{Metadata: metadata.Metadata{Type: textparse.MetricTypeUnknown}} c.metadata[string(metric)] = e } - if e.unit != yoloString(unit) { - e.unit = string(unit) + if e.Unit != yoloString(unit) { + e.Unit = string(unit) + e.lastIterChange = c.iter } e.lastIter = c.iter @@ -1078,9 +1087,9 @@ func (c *scrapeCache) GetMetadata(metric string) (MetricMetadata, bool) { } return MetricMetadata{ Metric: metric, - Type: m.typ, - Help: m.help, - Unit: m.unit, + Type: m.Type, + Help: m.Help, + Unit: m.Unit, }, true } @@ -1093,9 +1102,9 @@ func (c *scrapeCache) ListMetadata() []MetricMetadata { for m, e := range c.metadata { res = append(res, MetricMetadata{ Metric: m, - Type: e.typ, - Help: e.help, - Unit: e.unit, + Type: e.Type, + Help: e.Help, + Unit: e.Unit, }) } return res @@ -1135,6 +1144,7 @@ func newScrapeLoop(ctx context.Context, interval time.Duration, timeout time.Duration, reportExtraMetrics bool, + appendMetadataToWAL bool, target *Target, metricMetadataStore MetricMetadataStore, passMetadataInContext bool, @@ -1178,6 +1188,7 @@ func newScrapeLoop(ctx context.Context, interval: interval, timeout: timeout, reportExtraMetrics: reportExtraMetrics, + appendMetadataToWAL: appendMetadataToWAL, } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -1457,12 +1468,37 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, } var ( - defTime = timestamp.FromTime(ts) - appErrs = appendErrors{} - sampleLimitErr error - e exemplar.Exemplar // escapes to heap so hoisted out of loop + defTime = timestamp.FromTime(ts) + appErrs = appendErrors{} + sampleLimitErr error + e exemplar.Exemplar // escapes to heap so hoisted out of loop + meta metadata.Metadata + metadataChanged bool ) + // updateMetadata updates the current iteration's metadata object and the + // metadataChanged value if we have metadata in the scrape cache AND the + // labelset is for a new series or the metadata for this series has just + // changed. It returns a boolean based on whether the metadata was updated. + updateMetadata := func(lset labels.Labels, isNewSeries bool) bool { + if !sl.appendMetadataToWAL { + return false + } + + sl.cache.metaMtx.Lock() + defer sl.cache.metaMtx.Unlock() + metaEntry, metaOk := sl.cache.metadata[yoloString([]byte(lset.Get(labels.MetricName)))] + if metaOk && (isNewSeries || metaEntry.lastIterChange == sl.cache.iter) { + metadataChanged = true + meta.Type = metaEntry.Type + meta.Unit = metaEntry.Unit + meta.Help = metaEntry.Help + return true + } + sl.cache.metaMtx.Unlock() + return false + } + // Take an appender with limits. app = appender(app, sl.sampleLimit) @@ -1512,6 +1548,10 @@ loop: t = *tp } + // Zero metadata out for current iteration until it's resolved. + meta = metadata.Metadata{} + metadataChanged = false + if sl.cache.getDropped(yoloString(met)) { continue } @@ -1526,6 +1566,9 @@ loop: if ok { ref = ce.ref lset = ce.lset + + // Update metadata only if it changed in the current iteration. + updateMetadata(lset, false) } else { mets = p.Metric(&lset) hash = lset.Hash() @@ -1550,6 +1593,9 @@ loop: targetScrapePoolExceededLabelLimits.Inc() break loop } + + // Append metadata for new series if they were present. + updateMetadata(lset, true) } ref, err = app.Append(ref, lset, t, v) @@ -1566,7 +1612,7 @@ loop: // Bypass staleness logic if there is an explicit timestamp. sl.cache.trackStaleness(hash, lset) } - sl.cache.addRef(mets, ref, lset, hash) + sl.cache.addRef(mets, ref, lset, meta, hash) if sampleAdded && sampleLimitErr == nil { seriesAdded++ } @@ -1589,6 +1635,12 @@ loop: e = exemplar.Exemplar{} // reset for next time round loop } + if sl.appendMetadataToWAL && metadataChanged { + if _, merr := app.UpdateMetadata(ref, lset, meta); merr != nil { + // No need to fail the scrape on errors appending metadata. + level.Debug(sl.l).Log("msg", "Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", meta), "err", merr) + } + } } if sampleLimitErr != nil { if err == nil { @@ -1786,7 +1838,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v switch errors.Cause(err) { case nil: if !ok { - sl.cache.addRef(s, ref, lset, lset.Hash()) + sl.cache.addRef(s, ref, lset, metadata.Metadata{}, lset.Hash()) } return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 2b48949c88..69124409be 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/timestamp" @@ -624,6 +625,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { 1, 0, false, + false, nil, nil, false, @@ -696,6 +698,7 @@ func TestScrapeLoopStop(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -771,6 +774,7 @@ func TestScrapeLoopRun(t *testing.T) { time.Second, time.Hour, false, + false, nil, nil, false, @@ -826,6 +830,7 @@ func TestScrapeLoopRun(t *testing.T) { time.Second, 100*time.Millisecond, false, + false, nil, nil, false, @@ -885,6 +890,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { time.Second, time.Hour, false, + false, nil, nil, false, @@ -943,6 +949,7 @@ func TestScrapeLoopMetadata(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1000,6 +1007,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) { 0, 0, false, + false, nil, nil, false, @@ -1093,6 +1101,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -1155,6 +1164,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -1221,6 +1231,7 @@ func TestScrapeLoopCache(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -1303,6 +1314,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -1417,6 +1429,7 @@ func TestScrapeLoopAppend(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1507,7 +1520,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) { return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil) }, nil, - func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, nil, nil, false, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, false, nil, nil, false, ) slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)) @@ -1543,6 +1556,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1555,12 +1569,13 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { require.NoError(t, warning) var lset labels.Labels + var meta metadata.Metadata p.Next() mets := p.Metric(&lset) hash := lset.Hash() // Create a fake entry in the cache - sl.cache.addRef(mets, fakeRef, lset, hash) + sl.cache.addRef(mets, fakeRef, lset, meta, hash) now := time.Now() slApp := sl.appender(context.Background()) @@ -1601,6 +1616,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1678,6 +1694,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1726,6 +1743,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1777,6 +1795,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -1888,6 +1907,7 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000 0, 0, false, + false, nil, nil, false, @@ -1953,6 +1973,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2005,6 +2026,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -2041,6 +2063,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -2090,6 +2113,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T 0, 0, false, + false, nil, nil, false, @@ -2135,6 +2159,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2392,6 +2417,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2433,6 +2459,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2473,6 +2500,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2531,6 +2559,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2807,6 +2836,7 @@ func TestScrapeAddFast(t *testing.T) { 0, 0, false, + false, nil, nil, false, @@ -2898,6 +2928,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { 10*time.Millisecond, time.Hour, false, + false, nil, nil, false, @@ -3100,6 +3131,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) { 0, 0, false, + false, nil, nil, false,