From 2ccea8391306bc8081fc359b3e5e6d1732beab71 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 5 Mar 2025 12:15:09 +0100 Subject: [PATCH 1/3] Implement rule evaluations This commit extends Prometheus with the option to run aggregation rules during scrape time, before relabeling takes place. All rules are executed independently for each target in the context of individual scrapes. Newly calculated samples are committed to TSDB atomically with all samples from that scrape. Metric relabeling can also be applied on series recorded from scrape rules. Signed-off-by: Filip Petkovski --- cmd/prometheus/main.go | 43 ++-- config/config.go | 35 +++ config/config_test.go | 38 +++ docs/configuration/configuration.md | 27 +++ scrape/manager.go | 18 +- scrape/manager_test.go | 18 +- scrape/rules.go | 231 +++++++++++++++++++ scrape/rules_test.go | 75 ++++++ scrape/scrape.go | 136 ++++++++--- scrape/scrape_test.go | 343 +++++++++++++++++++--------- 10 files changed, 791 insertions(+), 173 deletions(-) create mode 100644 scrape/rules.go create mode 100644 scrape/rules_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4559d51837..feca2430ac 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -270,6 +270,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "promql-delayed-name-removal": c.promqlEnableDelayedNameRemoval = true logger.Info("Experimental PromQL delayed name removal enabled.") + case "scrape-rules": + c.scrape.EnableScrapeRules = true + logger.Info("Experimental scrape-time recording rules enabled.") case "": continue case "old-ui": @@ -744,12 +747,30 @@ func main() { os.Exit(1) } + opts := promql.EngineOpts{ + Logger: logger.With("component", "query engine"), + Reg: prometheus.DefaultRegisterer, + MaxSamples: cfg.queryMaxSamples, + Timeout: time.Duration(cfg.queryTimeout), + ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")), + LookbackDelta: time.Duration(cfg.lookbackDelta), + NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, + // EnableAtModifier and EnableNegativeOffset have to be + // always on for regular PromQL as of Prometheus v2.33. + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: cfg.enablePerStepStats, + EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval, + } + queryEngine := promql.NewEngine(opts) + scrapeManager, err := scrape.NewManager( &cfg.scrape, logger.With("component", "scrape manager"), logging.NewJSONFileLogger, fanoutStorage, prometheus.DefaultRegisterer, + queryEngine, ) if err != nil { logger.Error("failed to create a scrape manager", "err", err) @@ -758,9 +779,7 @@ func main() { var ( tracingManager = tracing.NewManager(logger) - - queryEngine *promql.Engine - ruleManager *rules.Manager + ruleManager *rules.Manager ) if cfg.maxprocsEnable { @@ -787,24 +806,6 @@ func main() { } if !agentMode { - opts := promql.EngineOpts{ - Logger: logger.With("component", "query engine"), - Reg: prometheus.DefaultRegisterer, - MaxSamples: cfg.queryMaxSamples, - Timeout: time.Duration(cfg.queryTimeout), - ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")), - LookbackDelta: time.Duration(cfg.lookbackDelta), - NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, - // EnableAtModifier and EnableNegativeOffset have to be - // always on for regular PromQL as of Prometheus v2.33. - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: cfg.enablePerStepStats, - EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval, - } - - queryEngine = promql.NewEngine(opts) - ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, Queryable: localStorage, diff --git a/config/config.go b/config/config.go index a395c3f53e..e282ee1d35 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ package config import ( "errors" "fmt" + "github.com/prometheus/prometheus/promql/parser" "log/slog" "mime" "net/url" @@ -732,6 +733,8 @@ type ScrapeConfig struct { RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` // List of metric relabel configurations. MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"` + // List of rules to execute at scrape time. + RuleConfigs []*ScrapeRuleConfig `yaml:"scrape_rule_configs,omitempty"` } // SetDirectory joins any relative file paths with dir. @@ -858,6 +861,38 @@ func (c *ScrapeConfig) MarshalYAML() (interface{}, error) { return discovery.MarshalYAMLWithInlineConfigs(c) } +// ScrapeRuleConfig is the configuration for rules executed +// at scrape time for each individual target. +type ScrapeRuleConfig struct { + Expr string `yaml:"expr"` + Record string `yaml:"record"` +} + +func (a *ScrapeRuleConfig) Validate() error { + if a.Record == "" { + return errors.New("aggregation rule record must not be empty") + } + + if a.Expr == "" { + return errors.New("aggregation rule expression must not be empty") + } + + expr, err := parser.ParseExpr(a.Expr) + if err != nil { + return fmt.Errorf("invalid scrape rule expression: %w", err) + } + + parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { + if _, ok := node.(*parser.MatrixSelector); ok { + err = errors.New("matrix selectors are not allowed in scrape rule expressions") + return err + } + return nil + }) + + return err +} + // StorageConfig configures runtime reloadable configuration options. type StorageConfig struct { TSDBConfig *TSDBConfig `yaml:"tsdb,omitempty"` diff --git a/config/config_test.go b/config/config_test.go index faca7dda12..f5b9a323c5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -16,6 +16,7 @@ package config import ( "crypto/tls" "encoding/json" + "errors" "fmt" "net/url" "os" @@ -2566,3 +2567,40 @@ func TestGetScrapeConfigs_Loaded(t *testing.T) { require.NoError(t, err) }) } + +func TestScrapeRuleConfigs(t *testing.T) { + tests := []struct { + name string + ruleConfig ScrapeRuleConfig + err error + }{ + { + name: "valid scrape rule config", + ruleConfig: ScrapeRuleConfig{ + Expr: "sum by (label) (metric)", + Record: "sum:metric:label", + }, + err: nil, + }, + { + name: "matrix selector in record expression", + ruleConfig: ScrapeRuleConfig{ + Expr: "sum by (label) (rate(metric[2m]))", + Record: "sum:metric:label", + }, + err: errors.New("matrix selectors are not allowed in scrape rule expressions"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.ruleConfig.Validate() + if test.err == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Equal(t, test.err.Error(), err.Error()) + } + }) + } +} \ No newline at end of file diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 3550094ff2..25802f71dc 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -419,6 +419,11 @@ static_configs: relabel_configs: [ - ... ] +# List of rules that are executed on samples from each scrape. +# Each rule is evaluated independently and is executed after relabel_configs and before metric_relabel_configs. +scrape_rule_configs: +[ - ... ] + # List of metric relabel configurations. metric_relabel_configs: [ - ... ] @@ -2607,6 +2612,28 @@ anchored on both ends. To un-anchor the regex, use `.*.*`. Care must be taken with `labeldrop` and `labelkeep` to ensure that metrics are still uniquely labeled once the labels are removed. +### `` + +Scrape rules are applied after target relabeling and before metric relabeling. They can be used to evaluate PromQL expressions against samples from individual scrapes. + +Rules are only evaluated in the context of single scrapes and do not take into account samples from different targets, nor samples that are already ingested in Prometheus. + +Metric relabelings can be applied to series generated by scrape rules. + +A potential use of scrape rules is to aggregate away expensive series before they are ingested in Prometheus. +If the source series are not needed, they can be dropped using metric relabeling rules. + +### `` + +The syntax for a scrape rules is as follows: + +```yaml +# The name of the time series to output to. Must be a valid metric name. +record: +# The PromQL expression to evaluate. +expr: +``` + ### `` Metric relabeling is applied to samples as the last step before ingestion. It diff --git a/scrape/manager.go b/scrape/manager.go index 5ef5dccb99..cfcb8b284a 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -37,7 +39,14 @@ import ( ) // NewManager is the Manager constructor. -func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) { +func NewManager( + o *Options, + logger *slog.Logger, + newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), + app storage.Appendable, + registerer prometheus.Registerer, + queryEngine *promql.Engine, +) (*Manager, error) { if o == nil { o = &Options{} } @@ -61,6 +70,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str triggerReload: make(chan struct{}, 1), metrics: sm, buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }), + queryEngine: queryEngine, } m.metrics.setTargetMetadataCacheGatherer(m) @@ -90,6 +100,9 @@ type Options struct { // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption + // EnableScrapeRules enables rule evaluation at scrape time. + EnableScrapeRules bool + // private option for testability. skipOffsetting bool } @@ -111,6 +124,7 @@ type Manager struct { targetSets map[string][]*targetgroup.Group buffers *pool.Pool + queryEngine *promql.Engine triggerReload chan struct{} metrics *scrapeMetrics @@ -182,7 +196,7 @@ func (m *Manager) reload() { continue } m.metrics.targetScrapePools.Inc() - sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics) + sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.queryEngine, m.opts, m.metrics) if err != nil { m.metrics.targetScrapePoolsFailed.Inc() m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName) diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 96381fa736..a8ae37c673 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -521,7 +521,7 @@ scrape_configs: ) opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) newLoop := func(scrapeLoopOptions) loop { ch <- struct{}{} @@ -586,7 +586,7 @@ scrape_configs: func TestManagerTargetsUpdates(t *testing.T) { opts := Options{} testRegistry := prometheus.NewRegistry() - m, err := NewManager(&opts, nil, nil, nil, testRegistry) + m, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) ts := make(chan map[string][]*targetgroup.Group) @@ -639,7 +639,7 @@ global: opts := Options{} testRegistry := prometheus.NewRegistry() - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) // Load the first config. @@ -716,7 +716,7 @@ scrape_configs: } opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) reload(scrapeManager, cfg1) @@ -1055,7 +1055,7 @@ func TestUnregisterMetrics(t *testing.T) { // Check that all metrics can be unregistered, allowing a second manager to be created. for i := 0; i < 2; i++ { opts := Options{} - manager, err := NewManager(&opts, nil, nil, nil, reg) + manager, err := NewManager(&opts, nil, nil, nil, reg, nil) require.NotNil(t, manager) require.NoError(t, err) // Unregister all metrics. @@ -1104,13 +1104,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A sdMetrics, discovery.Updatert(100*time.Millisecond), ) - scrapeManager, err := NewManager( - opts, - nil, - nil, - app, - prometheus.NewRegistry(), - ) + scrapeManager, err := NewManager(opts, nil, nil, app, prometheus.NewRegistry(), nil) require.NoError(t, err) go discoveryManager.Run() go scrapeManager.Run(discoveryManager.SyncCh()) diff --git a/scrape/rules.go b/scrape/rules.go new file mode 100644 index 0000000000..5e288c6126 --- /dev/null +++ b/scrape/rules.go @@ -0,0 +1,231 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scrape + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" +) + +type RuleEngine interface { + NewScrapeBatch() Batch + EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) +} + +// ruleEngine evaluates rules from individual targets at scrape time. +type ruleEngine struct { + rules []*config.ScrapeRuleConfig + engine *promql.Engine +} + +// newRuleEngine creates a new RuleEngine. +func newRuleEngine( + rules []*config.ScrapeRuleConfig, + queryEngine *promql.Engine, +) RuleEngine { + if len(rules) == 0 { + return &nopRuleEngine{} + } + + return &ruleEngine{ + rules: rules, + engine: queryEngine, + } +} + +// NewScrapeBatch creates a new Batch which can be used to add samples from a single scrape. +// Rules are always evaluated on a single Batch. +func (r *ruleEngine) NewScrapeBatch() Batch { + return &batch{ + samples: make([]Sample, 0), + } +} + +// EvaluateRules executes rules on the given Batch and returns new Samples. +func (r *ruleEngine) EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) { + var ( + result []Sample + builder labels.ScratchBuilder + ) + for _, rule := range r.rules { + queryable := storage.QueryableFunc(func(_, _ int64) (storage.Querier, error) { + return b, nil + }) + + query, err := r.engine.NewInstantQuery(context.Background(), queryable, nil, rule.Expr, ts) + if err != nil { + return nil, err + } + + samples, err := query.Exec(context.Background()).Vector() + if err != nil { + return nil, err + } + + for _, s := range samples { + builder.Reset() + metricNameSet := false + s.Metric.Range(func(lbl labels.Label) { + if lbl.Name == labels.MetricName { + metricNameSet = true + builder.Add(labels.MetricName, rule.Record) + } else { + builder.Add(lbl.Name, lbl.Value) + } + }) + if !metricNameSet { + builder.Add(labels.MetricName, rule.Record) + } + builder.Sort() + result = append(result, Sample{ + metric: sampleMutator(builder.Labels()), + t: s.T, + f: s.F, + fh: s.H, + }) + } + } + + return result, nil +} + +// Batch is used to collect floats from a single scrape. +type Batch interface { + storage.Querier + AddFloatSample(labels.Labels, int64, float64) + AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram) +} + +type batch struct { + samples []Sample +} + +type Sample struct { + metric labels.Labels + t int64 + f float64 + fh *histogram.FloatHistogram +} + +func (b *batch) AddFloatSample(lbls labels.Labels, t int64, f float64) { + b.samples = append(b.samples, Sample{ + metric: lbls, + t: t, + f: f, + }) +} + +func (b *batch) AddHistogramSample(lbls labels.Labels, t int64, fh *histogram.FloatHistogram) { + b.samples = append(b.samples, Sample{ + metric: lbls, + t: t, + fh: fh, + }) +} + +func (b *batch) Select(_ context.Context, _ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + var samples []Sample + for _, s := range b.samples { + match := true + for _, matcher := range matchers { + if !matcher.Matches(s.metric.Get(matcher.Name)) { + match = false + break + } + } + if match { + samples = append(samples, s) + } + } + + return &seriesSet{ + i: -1, + samples: samples, + } +} + +func (b *batch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *batch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *batch) Close() error { return nil } + +type seriesSet struct { + i int + samples []Sample +} + +func (s *seriesSet) Next() bool { + s.i++ + return s.i != len(s.samples) +} + +func (s *seriesSet) At() storage.Series { + sample := s.samples[s.i] + if sample.fh != nil { + return promql.NewStorageSeries(promql.Series{ + Metric: sample.metric, + Histograms: []promql.HPoint{{T: sample.t, H: sample.fh}}, + }) + } else { + return promql.NewStorageSeries(promql.Series{ + Metric: sample.metric, + Floats: []promql.FPoint{{T: sample.t, F: sample.f}}, + }) + } +} + +func (s *seriesSet) Err() error { return nil } +func (s *seriesSet) Warnings() annotations.Annotations { return nil } + +// nopRuleEngine does not produce any new floats when evaluating rules. +type nopRuleEngine struct{} + +func (n nopRuleEngine) NewScrapeBatch() Batch { return &nopBatch{} } + +func (n nopRuleEngine) EvaluateRules(Batch, time.Time, labelsMutator) ([]Sample, error) { + return nil, nil +} + +type nopBatch struct{} + +func (b *nopBatch) AddFloatSample(labels.Labels, int64, float64) {} + +func (b *nopBatch) AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram) { +} + +func (b *nopBatch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *nopBatch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *nopBatch) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { + return nil +} + +func (b *nopBatch) Close() error { return nil } diff --git a/scrape/rules_test.go b/scrape/rules_test.go new file mode 100644 index 0000000000..ba14535dfd --- /dev/null +++ b/scrape/rules_test.go @@ -0,0 +1,75 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scrape + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/promql" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" +) + +func TestRuleEngine(t *testing.T) { + now := time.Now() + samples := []Sample{ + { + metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/"), + t: now.UnixMilli(), + f: 10, + }, + { + metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/metrics"), + t: now.UnixMilli(), + f: 6, + }, + } + rules := []*config.ScrapeRuleConfig{ + { + Expr: "sum by (code) (http_requests_total)", + Record: "code:http_requests_total:sum", + }, + } + + engine := promql.NewEngine(promql.EngineOpts{ + MaxSamples: 50000000, + Timeout: 10 * time.Second, + LookbackDelta: 5 * time.Minute, + }) + re := newRuleEngine(rules, engine) + b := re.NewScrapeBatch() + for _, s := range samples { + b.AddFloatSample(s.metric, s.t, s.f) + } + + result, err := re.EvaluateRules(b, now, nopMutator) + require.NoError(t, err) + + if len(result) != 1 { + t.Fatalf("Invalid sample count, got %d, want %d", len(result), 3) + } + + expectedSamples := []Sample{ + { + metric: labels.FromStrings("__name__", "code:http_requests_total:sum", "code", "200"), + t: now.UnixMilli(), + f: 16, + }, + } + require.ElementsMatch(t, expectedSamples, result) +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 2cd3d78d53..609d39caba 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -31,6 +31,8 @@ import ( "time" "unsafe" + "github.com/prometheus/prometheus/promql" + "github.com/klauspost/compress/gzip" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -102,6 +104,8 @@ type scrapePool struct { scrapeFailureLogger FailureLogger scrapeFailureLoggerMtx sync.RWMutex + + enableScrapeRuleEval bool } type labelLimits struct { @@ -137,7 +141,16 @@ const maxAheadTime = 10 * time.Minute // returning an empty label set is interpreted as "drop". type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) { +func newScrapePool( + cfg *config.ScrapeConfig, + app storage.Appendable, + offsetSeed uint64, + logger *slog.Logger, + buffers *pool.Pool, + queryEngine *promql.Engine, + options *Options, + metrics *scrapeMetrics, +) (*scrapePool, error) { if logger == nil { logger = promslog.NewNopLogger() } @@ -160,6 +173,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed logger: logger, metrics: metrics, httpOpts: options.HTTPClientOptions, + enableScrapeRuleEval: options.EnableScrapeRules, } sp.newLoop = func(opts scrapeLoopOptions) loop { // Update the targets retrieval function for metadata to a new scrape cache. @@ -169,6 +183,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed } opts.target.SetMetadataStore(cache) + var re RuleEngine + if sp.enableScrapeRuleEval { + re = newRuleEngine(cfg.RuleConfigs, queryEngine) + } else { + re = &nopRuleEngine{} + } + return newScrapeLoop( ctx, opts.scraper, @@ -179,6 +200,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, func(ctx context.Context) storage.Appender { return app.Appender(ctx) }, + re, cache, sp.symbolTable, offsetSeed, @@ -893,6 +915,7 @@ type scrapeLoop struct { l *slog.Logger scrapeFailureLogger FailureLogger scrapeFailureLoggerMtx sync.RWMutex + ruleEngine RuleEngine cache *scrapeCache lastScrapeSize int buffers *pool.Pool @@ -1200,13 +1223,15 @@ func (c *scrapeCache) LengthMetadata() int { return len(c.metadata) } -func newScrapeLoop(ctx context.Context, +func newScrapeLoop( + ctx context.Context, sc scraper, l *slog.Logger, buffers *pool.Pool, sampleMutator labelsMutator, reportSampleMutator labelsMutator, appender func(ctx context.Context) storage.Appender, + re RuleEngine, cache *scrapeCache, symbolTable *labels.SymbolTable, offsetSeed uint64, @@ -1256,6 +1281,7 @@ func newScrapeLoop(ctx context.Context, sl := &scrapeLoop{ scraper: sc, buffers: buffers, + ruleEngine: re, cache: cache, appender: appender, symbolTable: symbolTable, @@ -1648,6 +1674,9 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, sl.cache.iterDone(true) }() + // Make a new batch for evaluating scrape rules + scrapeBatch := sl.ruleEngine.NewScrapeBatch() + loop: for { var ( @@ -1700,15 +1729,11 @@ loop: t = *parsedTimestamp } - if sl.cache.getDropped(met) { - continue - } - ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met) var ( ref storage.SeriesRef hash uint64 ) - + ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met) if seriesCached { ref = ce.ref lset = ce.lset @@ -1716,31 +1741,44 @@ loop: } else { p.Labels(&lset) hash = lset.Hash() - - // Hash label set as it is seen local to the target. Then add target labels - // and relabeling and store the final label set. - lset = sl.sampleMutator(lset) - - // The label set may be set to empty to indicate dropping. - if lset.IsEmpty() { - sl.cache.addDropped(met) - continue + } + if isHistogram && sl.enableNativeHistogramIngestion { + if h != nil { + scrapeBatch.AddHistogramSample(lset, t, h.ToFloat(nil)) + } else { + scrapeBatch.AddHistogramSample(lset, t, fh) } + } else { + scrapeBatch.AddFloatSample(lset, t, val) + } - if !lset.Has(labels.MetricName) { - err = errNameLabelMandatory - break loop - } - if !lset.IsValid(sl.validationScheme) { - err = fmt.Errorf("invalid metric name or label names: %s", lset.String()) - break loop - } + if sl.cache.getDropped(met) { + continue + } - // If any label limits is exceeded the scrape should fail. - if err = verifyLabelLimits(lset, sl.labelLimits); err != nil { - sl.metrics.targetScrapePoolExceededLabelLimits.Inc() - break loop - } + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + lset = sl.sampleMutator(lset) + + // The label set may be set to empty to indicate dropping. + if lset.IsEmpty() { + sl.cache.addDropped(met) + continue + } + + if !lset.Has(labels.MetricName) { + err = errNameLabelMandatory + break loop + } + if !lset.IsValid(sl.validationScheme) { + err = fmt.Errorf("invalid metric name or label names: %s", lset.String()) + break loop + } + + // If any label limits is exceeded the scrape should fail. + if err = verifyLabelLimits(lset, sl.labelLimits); err != nil { + sl.metrics.targetScrapePoolExceededLabelLimits.Inc() + break loop } if seriesAlreadyScraped && parsedTimestamp == nil { @@ -1802,8 +1840,8 @@ loop: } // Increment added even if there's an error so we correctly report the - // number of samples remaining after relabeling. - // We still report duplicated samples here since this number should be the exact number + // number of floats remaining after relabeling. + // We still report duplicated floats here since this number should be the exact number // of time series exposed on a scrape after relabelling. added++ exemplars = exemplars[:0] // Reset and reuse the exemplar slice. @@ -1889,9 +1927,41 @@ loop: if appErrs.numExemplarOutOfOrder > 0 { sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) } - if err == nil { - err = sl.updateStaleMarkers(app, defTime) + if err != nil { + return } + if err = sl.updateStaleMarkers(app, defTime); err != nil { + return + } + + ruleSamples, ruleErr := sl.ruleEngine.EvaluateRules(scrapeBatch, ts, sl.sampleMutator) + if ruleErr != nil { + err = ruleErr + return + } + var recordBuf []byte + for _, s := range ruleSamples { + recordBuf = recordBuf[:0] + added++ + var ( + ce *cacheEntry + ok bool + ) + recordBytes := s.metric.Bytes(recordBuf) + ce, ok, _ = sl.cache.get(recordBytes) + if ok { + _, err = app.Append(ce.ref, s.metric, s.t, s.f) + if err != nil { + return + } + } else { + var ref storage.SeriesRef + ref, err = app.Append(0, s.metric, s.t, s.f) + sl.cache.addRef(recordBytes, ref, s.metric, s.metric.Hash()) + seriesAdded++ + } + } + return } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 6bb140cca8..55618adf5e 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -34,6 +34,8 @@ import ( "text/template" "time" + "github.com/prometheus/prometheus/promql" + "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/grafana/regexp" @@ -84,7 +86,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) ) a, ok := sp.appendable.(*nopAppendable) @@ -339,7 +341,7 @@ func TestDroppedTargetsList(t *testing.T) { }, }, } - sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}" expectedLength = 2 ) @@ -509,7 +511,7 @@ func TestScrapePoolReload(t *testing.T) { mtx.Lock() targetScraper := opts.scraper.(*targetScraper) - require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper) + require.True(t, stopped[targetScraper.hash()], "Scrape loop for %f not stopped yet", targetScraper) mtx.Unlock() } return l @@ -778,7 +780,7 @@ func TestScrapePoolTargetLimit(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ := newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) loop := sp.newLoop(scrapeLoopOptions{ target: &Target{}, @@ -851,7 +853,7 @@ func TestScrapePoolRaces(t *testing.T) { newConfig := func() *config.ScrapeConfig { return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} } - sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{ @@ -937,35 +939,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app } func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop { - return newScrapeLoop(ctx, - scraper, - nil, nil, - nopMutator, - nopMutator, - app, - nil, - labels.NewSymbolTable(), - 0, - true, - false, - true, - 0, 0, histogram.ExponentialSchemaMax, - nil, - interval, - time.Hour, - false, - false, - false, - false, - false, - true, - nil, - false, - newTestScrapeMetrics(t), - false, - model.LegacyValidation, - fallback, - ) + return newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, interval, time.Hour, false, false, false, false, false, true, nil, false, newTestScrapeMetrics(t), false, model.LegacyValidation, fallback) } func TestScrapeLoopStopBeforeRun(t *testing.T) { @@ -1083,35 +1057,7 @@ func TestScrapeLoopRun(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, - scraper, - nil, nil, - nopMutator, - nopMutator, - app, - nil, - nil, - 0, - true, - false, - true, - 0, 0, histogram.ExponentialSchemaMax, - nil, - time.Second, - time.Hour, - false, - false, - false, - false, - false, - false, - nil, - false, - scrapeMetrics, - false, - model.LegacyValidation, - "", - ) + sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, nil, 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, time.Second, time.Hour, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "") // The loop must terminate during the initial offset if the context // is canceled. @@ -1230,35 +1176,7 @@ func TestScrapeLoopMetadata(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, - scraper, - nil, nil, - nopMutator, - nopMutator, - func(_ context.Context) storage.Appender { return nopAppender{} }, - cache, - labels.NewSymbolTable(), - 0, - true, - false, - true, - 0, 0, histogram.ExponentialSchemaMax, - nil, - 0, - 0, - false, - false, - false, - false, - false, - false, - nil, - false, - scrapeMetrics, - false, - model.LegacyValidation, - "", - ) + sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, func(_ context.Context) storage.Appender { return nopAppender{} }, nopRuleEngine{}, cache, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, 0, 0, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "") defer cancel() slApp := sl.appender(ctx) @@ -3382,7 +3300,7 @@ func TestReuseScrapeCache(t *testing.T) { ScrapeInterval: model.Duration(5 * time.Second), MetricsPath: "/metrics", } - sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) t1 = &Target{ labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"), scrapeConfig: &config.ScrapeConfig{}, @@ -3567,7 +3485,7 @@ func TestReuseCacheRace(t *testing.T) { MetricsPath: "/metrics", } buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) - sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, buffers, nil, &Options{}, newTestScrapeMetrics(t)) t1 = &Target{ labels: labels.FromStrings("labelNew", "nameNew"), scrapeConfig: &config.ScrapeConfig{}, @@ -3668,7 +3586,7 @@ func TestScrapeReportLimit(t *testing.T) { ts, scrapedTwice := newScrapableServer("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n") defer ts.Close() - sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -3724,7 +3642,7 @@ func TestScrapeUTF8(t *testing.T) { ts, scrapedTwice := newScrapableServer("{\"with.dots\"} 42\n") defer ts.Close() - sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -3871,7 +3789,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) { }, }, } - sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}}, @@ -3953,7 +3871,7 @@ test_summary_count 199 ts, scrapedTwice := newScrapableServer(metricsText) defer ts.Close() - sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -4297,7 +4215,7 @@ metric: < foundLeValues[v] = true } - require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %v but found %v", expectedValues, foundLeValues) + require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %f but found %f", expectedValues, foundLeValues) for _, v := range expectedValues { require.Contains(t, foundLeValues, v, "label value not found") } @@ -4455,7 +4373,7 @@ metric: < })) defer ts.Close() - sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -4603,7 +4521,7 @@ func TestScrapeLoopCompression(t *testing.T) { EnableCompression: tc.enableCompression, } - sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -4885,7 +4803,7 @@ scrape_configs: s.DB.EnableNativeHistograms() reg := prometheus.NewRegistry() - mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg) + mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg, nil) require.NoError(t, err) cfg, err := config.Load(configStr, promslog.NewNopLogger()) require.NoError(t, err) @@ -4928,7 +4846,7 @@ scrape_configs: it := series.Iterator(nil) for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { if vt != chunkenc.ValHistogram { - // don't care about other samples + // don't care about other floats continue } _, h := it.AtHistogram(nil) @@ -4987,7 +4905,7 @@ func TestTargetScrapeConfigWithLabels(t *testing.T) { } } - sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) t.Cleanup(sp.stop) @@ -5138,7 +5056,7 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) { }, } - p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) t.Cleanup(p.stop) @@ -5153,3 +5071,218 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) { <-time.After(1 * time.Second) } + +func TestScrapeWithScrapeRules(t *testing.T) { + ts := time.Now() + + tests := []struct { + name string + scrape string + discoveryLabels []string + scrapeRules []*config.ScrapeRuleConfig + relabelConfig []*relabel.Config + expectedFloats []floatSample + }{ + { + name: "scrape rules without relabeling", + scrape: ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5`, + discoveryLabels: []string{"instance", "local"}, + scrapeRules: []*config.ScrapeRuleConfig{ + { + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + }, + }, + relabelConfig: nil, + expectedFloats: []floatSample{ + { + metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "1"), + t: ts.UnixMilli(), + f: 3, + }, + { + metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "2"), + t: ts.UnixMilli(), + f: 5, + }, + { + metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"), + t: ts.UnixMilli(), + f: 8, + }, + }, + }, + { + name: "scrape rules with relabeling on original series", + scrape: ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5`, + discoveryLabels: []string{"instance", "local"}, + scrapeRules: []*config.ScrapeRuleConfig{ + { + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + }, + }, + relabelConfig: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("metric"), + Action: relabel.Drop, + }, + }, + expectedFloats: []floatSample{ + { + metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"), + t: ts.UnixMilli(), + f: 8, + }, + }, + }, + { + name: "scrape rules with relabeling on recorded series", + scrape: ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5`, + discoveryLabels: []string{"instance", "local"}, + scrapeRules: []*config.ScrapeRuleConfig{ + { + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + }, + }, + relabelConfig: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("l1:metric:sum"), + Action: relabel.Keep, + }, + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("l1:metric:sum"), + Action: relabel.Replace, + TargetLabel: "__name__", + Replacement: "relabeled_rule", + }, + }, + expectedFloats: []floatSample{ + { + metric: labels.FromStrings("__name__", "relabeled_rule", "instance", "local", "l1", "1"), + t: ts.UnixMilli(), + f: 8, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var ( + app = &collectResultAppender{} + re = newRuleEngine(test.scrapeRules, promql.NewEngine( + promql.EngineOpts{ + MaxSamples: 500000, + Timeout: 30 * time.Second, + }), + ) + target = &Target{ + labels: labels.FromStrings(test.discoveryLabels...), + } + ) + sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { return app }, 0) + sl.sampleMutator = func(l labels.Labels) labels.Labels { + return mutateSampleLabels(l, target, false, test.relabelConfig) + } + sl.reportSampleMutator = func(l labels.Labels) labels.Labels { + return mutateReportSampleLabels(l, target) + } + sl.ruleEngine = re + + slApp := sl.appender(context.Background()) + _, _, _, err := sl.append(slApp, []byte(test.scrape), "text/plain", ts) + require.NoError(t, err) + + require.NoError(t, slApp.Commit()) + require.Equal(t, test.expectedFloats, app.resultFloats) + }) + } +} + +func TestScrapeReportWithScrapeRules(t *testing.T) { + ts := time.Now() + + scrapeRule := config.ScrapeRuleConfig{ + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + } + + scrape := ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5` + + _, sl := simpleTestScrapeLoop(t) + sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine( + promql.EngineOpts{ + MaxSamples: 500000, + Timeout: 30 * time.Second, + }, + )) + + slApp := sl.appender(context.Background()) + scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 3, added) + require.Equal(t, 3, seriesAdded) + + scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 3, added) + require.Equal(t, 0, seriesAdded) +} + +func TestScrapeReportWithScrapeRulesAndRelabeling(t *testing.T) { + ts := time.Now() + + scrapeRule := config.ScrapeRuleConfig{ + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + } + + scrape := ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5` + + _, sl := simpleTestScrapeLoop(t) + sl.sampleMutator = func(l labels.Labels) labels.Labels { + return mutateSampleLabels(l, &Target{}, false, []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("metric"), + Action: relabel.Drop, + }, + }) + } + sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine( + promql.EngineOpts{ + MaxSamples: 500000, + Timeout: 30 * time.Second, + }, + )) + + slApp := sl.appender(context.Background()) + scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 1, added) + require.Equal(t, 1, seriesAdded) + + scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 1, added) + require.Equal(t, 0, seriesAdded) +} From 355a84ad4c73f79c14997d4423ad9da921fd037e Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 5 Mar 2025 13:40:08 +0100 Subject: [PATCH 2/3] Run lint Signed-off-by: Filip Petkovski --- config/config.go | 4 ++-- config/config_test.go | 2 +- scrape/rules.go | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index e282ee1d35..c1ed2d0658 100644 --- a/config/config.go +++ b/config/config.go @@ -16,7 +16,6 @@ package config import ( "errors" "fmt" - "github.com/prometheus/prometheus/promql/parser" "log/slog" "mime" "net/url" @@ -37,6 +36,7 @@ import ( "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage/remote/azuread" "github.com/prometheus/prometheus/storage/remote/googleiam" ) @@ -882,7 +882,7 @@ func (a *ScrapeRuleConfig) Validate() error { return fmt.Errorf("invalid scrape rule expression: %w", err) } - parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { + parser.Inspect(expr, func(node parser.Node, _ []parser.Node) error { if _, ok := node.(*parser.MatrixSelector); ok { err = errors.New("matrix selectors are not allowed in scrape rule expressions") return err diff --git a/config/config_test.go b/config/config_test.go index f5b9a323c5..b79c91c8a5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2603,4 +2603,4 @@ func TestScrapeRuleConfigs(t *testing.T) { } }) } -} \ No newline at end of file +} diff --git a/scrape/rules.go b/scrape/rules.go index 5e288c6126..f5b51ee77d 100644 --- a/scrape/rules.go +++ b/scrape/rules.go @@ -189,12 +189,12 @@ func (s *seriesSet) At() storage.Series { Metric: sample.metric, Histograms: []promql.HPoint{{T: sample.t, H: sample.fh}}, }) - } else { - return promql.NewStorageSeries(promql.Series{ - Metric: sample.metric, - Floats: []promql.FPoint{{T: sample.t, F: sample.f}}, - }) } + + return promql.NewStorageSeries(promql.Series{ + Metric: sample.metric, + Floats: []promql.FPoint{{T: sample.t, F: sample.f}}, + }) } func (s *seriesSet) Err() error { return nil } From de887c5fe19689870fbbb740e3360e9136ee9c85 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 5 Mar 2025 15:16:32 +0100 Subject: [PATCH 3/3] Fix dedupelabels Signed-off-by: Filip Petkovski --- scrape/rules.go | 2 +- scrape/scrape_test.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/scrape/rules.go b/scrape/rules.go index f5b51ee77d..45364c224d 100644 --- a/scrape/rules.go +++ b/scrape/rules.go @@ -63,7 +63,7 @@ func (r *ruleEngine) NewScrapeBatch() Batch { func (r *ruleEngine) EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) { var ( result []Sample - builder labels.ScratchBuilder + builder = labels.NewScratchBuilder(0) ) for _, rule := range r.rules { queryable := storage.QueryableFunc(func(_, _ int64) (storage.Querier, error) { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 55618adf5e..0e09003d82 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -5205,7 +5205,13 @@ metric{l1="1", l2="2"} 5`, require.NoError(t, err) require.NoError(t, slApp.Commit()) - require.Equal(t, test.expectedFloats, app.resultFloats) + require.Len(t, app.resultFloats, len(test.expectedFloats)) + for i := range app.resultFloats { + expected, actual := test.expectedFloats[i], app.resultFloats[i] + require.Equal(t, expected.metric.String(), actual.metric.String()) + require.Equal(t, expected.t, actual.t) + require.Equal(t, expected.f, actual.f) + } }) } }