diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4559d51837..feca2430ac 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -270,6 +270,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "promql-delayed-name-removal": c.promqlEnableDelayedNameRemoval = true logger.Info("Experimental PromQL delayed name removal enabled.") + case "scrape-rules": + c.scrape.EnableScrapeRules = true + logger.Info("Experimental scrape-time recording rules enabled.") case "": continue case "old-ui": @@ -744,12 +747,30 @@ func main() { os.Exit(1) } + opts := promql.EngineOpts{ + Logger: logger.With("component", "query engine"), + Reg: prometheus.DefaultRegisterer, + MaxSamples: cfg.queryMaxSamples, + Timeout: time.Duration(cfg.queryTimeout), + ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")), + LookbackDelta: time.Duration(cfg.lookbackDelta), + NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, + // EnableAtModifier and EnableNegativeOffset have to be + // always on for regular PromQL as of Prometheus v2.33. + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: cfg.enablePerStepStats, + EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval, + } + queryEngine := promql.NewEngine(opts) + scrapeManager, err := scrape.NewManager( &cfg.scrape, logger.With("component", "scrape manager"), logging.NewJSONFileLogger, fanoutStorage, prometheus.DefaultRegisterer, + queryEngine, ) if err != nil { logger.Error("failed to create a scrape manager", "err", err) @@ -758,9 +779,7 @@ func main() { var ( tracingManager = tracing.NewManager(logger) - - queryEngine *promql.Engine - ruleManager *rules.Manager + ruleManager *rules.Manager ) if cfg.maxprocsEnable { @@ -787,24 +806,6 @@ func main() { } if !agentMode { - opts := promql.EngineOpts{ - Logger: logger.With("component", "query engine"), - Reg: prometheus.DefaultRegisterer, - MaxSamples: cfg.queryMaxSamples, - Timeout: time.Duration(cfg.queryTimeout), - ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")), - LookbackDelta: time.Duration(cfg.lookbackDelta), - NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, - // EnableAtModifier and EnableNegativeOffset have to be - // always on for regular PromQL as of Prometheus v2.33. - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: cfg.enablePerStepStats, - EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval, - } - - queryEngine = promql.NewEngine(opts) - ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, Queryable: localStorage, diff --git a/config/config.go b/config/config.go index a395c3f53e..c1ed2d0658 100644 --- a/config/config.go +++ b/config/config.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage/remote/azuread" "github.com/prometheus/prometheus/storage/remote/googleiam" ) @@ -732,6 +733,8 @@ type ScrapeConfig struct { RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` // List of metric relabel configurations. MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"` + // List of rules to execute at scrape time. + RuleConfigs []*ScrapeRuleConfig `yaml:"scrape_rule_configs,omitempty"` } // SetDirectory joins any relative file paths with dir. @@ -858,6 +861,38 @@ func (c *ScrapeConfig) MarshalYAML() (interface{}, error) { return discovery.MarshalYAMLWithInlineConfigs(c) } +// ScrapeRuleConfig is the configuration for rules executed +// at scrape time for each individual target. +type ScrapeRuleConfig struct { + Expr string `yaml:"expr"` + Record string `yaml:"record"` +} + +func (a *ScrapeRuleConfig) Validate() error { + if a.Record == "" { + return errors.New("aggregation rule record must not be empty") + } + + if a.Expr == "" { + return errors.New("aggregation rule expression must not be empty") + } + + expr, err := parser.ParseExpr(a.Expr) + if err != nil { + return fmt.Errorf("invalid scrape rule expression: %w", err) + } + + parser.Inspect(expr, func(node parser.Node, _ []parser.Node) error { + if _, ok := node.(*parser.MatrixSelector); ok { + err = errors.New("matrix selectors are not allowed in scrape rule expressions") + return err + } + return nil + }) + + return err +} + // StorageConfig configures runtime reloadable configuration options. type StorageConfig struct { TSDBConfig *TSDBConfig `yaml:"tsdb,omitempty"` diff --git a/config/config_test.go b/config/config_test.go index faca7dda12..b79c91c8a5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -16,6 +16,7 @@ package config import ( "crypto/tls" "encoding/json" + "errors" "fmt" "net/url" "os" @@ -2566,3 +2567,40 @@ func TestGetScrapeConfigs_Loaded(t *testing.T) { require.NoError(t, err) }) } + +func TestScrapeRuleConfigs(t *testing.T) { + tests := []struct { + name string + ruleConfig ScrapeRuleConfig + err error + }{ + { + name: "valid scrape rule config", + ruleConfig: ScrapeRuleConfig{ + Expr: "sum by (label) (metric)", + Record: "sum:metric:label", + }, + err: nil, + }, + { + name: "matrix selector in record expression", + ruleConfig: ScrapeRuleConfig{ + Expr: "sum by (label) (rate(metric[2m]))", + Record: "sum:metric:label", + }, + err: errors.New("matrix selectors are not allowed in scrape rule expressions"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.ruleConfig.Validate() + if test.err == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Equal(t, test.err.Error(), err.Error()) + } + }) + } +} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 3550094ff2..25802f71dc 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -419,6 +419,11 @@ static_configs: relabel_configs: [ - ... ] +# List of rules that are executed on samples from each scrape. +# Each rule is evaluated independently and is executed after relabel_configs and before metric_relabel_configs. +scrape_rule_configs: +[ - ... ] + # List of metric relabel configurations. metric_relabel_configs: [ - ... ] @@ -2607,6 +2612,28 @@ anchored on both ends. To un-anchor the regex, use `.*.*`. Care must be taken with `labeldrop` and `labelkeep` to ensure that metrics are still uniquely labeled once the labels are removed. +### `` + +Scrape rules are applied after target relabeling and before metric relabeling. They can be used to evaluate PromQL expressions against samples from individual scrapes. + +Rules are only evaluated in the context of single scrapes and do not take into account samples from different targets, nor samples that are already ingested in Prometheus. + +Metric relabelings can be applied to series generated by scrape rules. + +A potential use of scrape rules is to aggregate away expensive series before they are ingested in Prometheus. +If the source series are not needed, they can be dropped using metric relabeling rules. + +### `` + +The syntax for a scrape rules is as follows: + +```yaml +# The name of the time series to output to. Must be a valid metric name. +record: +# The PromQL expression to evaluate. +expr: +``` + ### `` Metric relabeling is applied to samples as the last step before ingestion. It diff --git a/scrape/manager.go b/scrape/manager.go index 5ef5dccb99..cfcb8b284a 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -37,7 +39,14 @@ import ( ) // NewManager is the Manager constructor. -func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) { +func NewManager( + o *Options, + logger *slog.Logger, + newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), + app storage.Appendable, + registerer prometheus.Registerer, + queryEngine *promql.Engine, +) (*Manager, error) { if o == nil { o = &Options{} } @@ -61,6 +70,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str triggerReload: make(chan struct{}, 1), metrics: sm, buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }), + queryEngine: queryEngine, } m.metrics.setTargetMetadataCacheGatherer(m) @@ -90,6 +100,9 @@ type Options struct { // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption + // EnableScrapeRules enables rule evaluation at scrape time. + EnableScrapeRules bool + // private option for testability. skipOffsetting bool } @@ -111,6 +124,7 @@ type Manager struct { targetSets map[string][]*targetgroup.Group buffers *pool.Pool + queryEngine *promql.Engine triggerReload chan struct{} metrics *scrapeMetrics @@ -182,7 +196,7 @@ func (m *Manager) reload() { continue } m.metrics.targetScrapePools.Inc() - sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics) + sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.queryEngine, m.opts, m.metrics) if err != nil { m.metrics.targetScrapePoolsFailed.Inc() m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName) diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 96381fa736..a8ae37c673 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -521,7 +521,7 @@ scrape_configs: ) opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) newLoop := func(scrapeLoopOptions) loop { ch <- struct{}{} @@ -586,7 +586,7 @@ scrape_configs: func TestManagerTargetsUpdates(t *testing.T) { opts := Options{} testRegistry := prometheus.NewRegistry() - m, err := NewManager(&opts, nil, nil, nil, testRegistry) + m, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) ts := make(chan map[string][]*targetgroup.Group) @@ -639,7 +639,7 @@ global: opts := Options{} testRegistry := prometheus.NewRegistry() - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) // Load the first config. @@ -716,7 +716,7 @@ scrape_configs: } opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil) require.NoError(t, err) reload(scrapeManager, cfg1) @@ -1055,7 +1055,7 @@ func TestUnregisterMetrics(t *testing.T) { // Check that all metrics can be unregistered, allowing a second manager to be created. for i := 0; i < 2; i++ { opts := Options{} - manager, err := NewManager(&opts, nil, nil, nil, reg) + manager, err := NewManager(&opts, nil, nil, nil, reg, nil) require.NotNil(t, manager) require.NoError(t, err) // Unregister all metrics. @@ -1104,13 +1104,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A sdMetrics, discovery.Updatert(100*time.Millisecond), ) - scrapeManager, err := NewManager( - opts, - nil, - nil, - app, - prometheus.NewRegistry(), - ) + scrapeManager, err := NewManager(opts, nil, nil, app, prometheus.NewRegistry(), nil) require.NoError(t, err) go discoveryManager.Run() go scrapeManager.Run(discoveryManager.SyncCh()) diff --git a/scrape/rules.go b/scrape/rules.go new file mode 100644 index 0000000000..45364c224d --- /dev/null +++ b/scrape/rules.go @@ -0,0 +1,231 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scrape + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" +) + +type RuleEngine interface { + NewScrapeBatch() Batch + EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) +} + +// ruleEngine evaluates rules from individual targets at scrape time. +type ruleEngine struct { + rules []*config.ScrapeRuleConfig + engine *promql.Engine +} + +// newRuleEngine creates a new RuleEngine. +func newRuleEngine( + rules []*config.ScrapeRuleConfig, + queryEngine *promql.Engine, +) RuleEngine { + if len(rules) == 0 { + return &nopRuleEngine{} + } + + return &ruleEngine{ + rules: rules, + engine: queryEngine, + } +} + +// NewScrapeBatch creates a new Batch which can be used to add samples from a single scrape. +// Rules are always evaluated on a single Batch. +func (r *ruleEngine) NewScrapeBatch() Batch { + return &batch{ + samples: make([]Sample, 0), + } +} + +// EvaluateRules executes rules on the given Batch and returns new Samples. +func (r *ruleEngine) EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) { + var ( + result []Sample + builder = labels.NewScratchBuilder(0) + ) + for _, rule := range r.rules { + queryable := storage.QueryableFunc(func(_, _ int64) (storage.Querier, error) { + return b, nil + }) + + query, err := r.engine.NewInstantQuery(context.Background(), queryable, nil, rule.Expr, ts) + if err != nil { + return nil, err + } + + samples, err := query.Exec(context.Background()).Vector() + if err != nil { + return nil, err + } + + for _, s := range samples { + builder.Reset() + metricNameSet := false + s.Metric.Range(func(lbl labels.Label) { + if lbl.Name == labels.MetricName { + metricNameSet = true + builder.Add(labels.MetricName, rule.Record) + } else { + builder.Add(lbl.Name, lbl.Value) + } + }) + if !metricNameSet { + builder.Add(labels.MetricName, rule.Record) + } + builder.Sort() + result = append(result, Sample{ + metric: sampleMutator(builder.Labels()), + t: s.T, + f: s.F, + fh: s.H, + }) + } + } + + return result, nil +} + +// Batch is used to collect floats from a single scrape. +type Batch interface { + storage.Querier + AddFloatSample(labels.Labels, int64, float64) + AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram) +} + +type batch struct { + samples []Sample +} + +type Sample struct { + metric labels.Labels + t int64 + f float64 + fh *histogram.FloatHistogram +} + +func (b *batch) AddFloatSample(lbls labels.Labels, t int64, f float64) { + b.samples = append(b.samples, Sample{ + metric: lbls, + t: t, + f: f, + }) +} + +func (b *batch) AddHistogramSample(lbls labels.Labels, t int64, fh *histogram.FloatHistogram) { + b.samples = append(b.samples, Sample{ + metric: lbls, + t: t, + fh: fh, + }) +} + +func (b *batch) Select(_ context.Context, _ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + var samples []Sample + for _, s := range b.samples { + match := true + for _, matcher := range matchers { + if !matcher.Matches(s.metric.Get(matcher.Name)) { + match = false + break + } + } + if match { + samples = append(samples, s) + } + } + + return &seriesSet{ + i: -1, + samples: samples, + } +} + +func (b *batch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *batch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *batch) Close() error { return nil } + +type seriesSet struct { + i int + samples []Sample +} + +func (s *seriesSet) Next() bool { + s.i++ + return s.i != len(s.samples) +} + +func (s *seriesSet) At() storage.Series { + sample := s.samples[s.i] + if sample.fh != nil { + return promql.NewStorageSeries(promql.Series{ + Metric: sample.metric, + Histograms: []promql.HPoint{{T: sample.t, H: sample.fh}}, + }) + } + + return promql.NewStorageSeries(promql.Series{ + Metric: sample.metric, + Floats: []promql.FPoint{{T: sample.t, F: sample.f}}, + }) +} + +func (s *seriesSet) Err() error { return nil } +func (s *seriesSet) Warnings() annotations.Annotations { return nil } + +// nopRuleEngine does not produce any new floats when evaluating rules. +type nopRuleEngine struct{} + +func (n nopRuleEngine) NewScrapeBatch() Batch { return &nopBatch{} } + +func (n nopRuleEngine) EvaluateRules(Batch, time.Time, labelsMutator) ([]Sample, error) { + return nil, nil +} + +type nopBatch struct{} + +func (b *nopBatch) AddFloatSample(labels.Labels, int64, float64) {} + +func (b *nopBatch) AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram) { +} + +func (b *nopBatch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *nopBatch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (b *nopBatch) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { + return nil +} + +func (b *nopBatch) Close() error { return nil } diff --git a/scrape/rules_test.go b/scrape/rules_test.go new file mode 100644 index 0000000000..ba14535dfd --- /dev/null +++ b/scrape/rules_test.go @@ -0,0 +1,75 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scrape + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/promql" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" +) + +func TestRuleEngine(t *testing.T) { + now := time.Now() + samples := []Sample{ + { + metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/"), + t: now.UnixMilli(), + f: 10, + }, + { + metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/metrics"), + t: now.UnixMilli(), + f: 6, + }, + } + rules := []*config.ScrapeRuleConfig{ + { + Expr: "sum by (code) (http_requests_total)", + Record: "code:http_requests_total:sum", + }, + } + + engine := promql.NewEngine(promql.EngineOpts{ + MaxSamples: 50000000, + Timeout: 10 * time.Second, + LookbackDelta: 5 * time.Minute, + }) + re := newRuleEngine(rules, engine) + b := re.NewScrapeBatch() + for _, s := range samples { + b.AddFloatSample(s.metric, s.t, s.f) + } + + result, err := re.EvaluateRules(b, now, nopMutator) + require.NoError(t, err) + + if len(result) != 1 { + t.Fatalf("Invalid sample count, got %d, want %d", len(result), 3) + } + + expectedSamples := []Sample{ + { + metric: labels.FromStrings("__name__", "code:http_requests_total:sum", "code", "200"), + t: now.UnixMilli(), + f: 16, + }, + } + require.ElementsMatch(t, expectedSamples, result) +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 2cd3d78d53..609d39caba 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -31,6 +31,8 @@ import ( "time" "unsafe" + "github.com/prometheus/prometheus/promql" + "github.com/klauspost/compress/gzip" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -102,6 +104,8 @@ type scrapePool struct { scrapeFailureLogger FailureLogger scrapeFailureLoggerMtx sync.RWMutex + + enableScrapeRuleEval bool } type labelLimits struct { @@ -137,7 +141,16 @@ const maxAheadTime = 10 * time.Minute // returning an empty label set is interpreted as "drop". type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) { +func newScrapePool( + cfg *config.ScrapeConfig, + app storage.Appendable, + offsetSeed uint64, + logger *slog.Logger, + buffers *pool.Pool, + queryEngine *promql.Engine, + options *Options, + metrics *scrapeMetrics, +) (*scrapePool, error) { if logger == nil { logger = promslog.NewNopLogger() } @@ -160,6 +173,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed logger: logger, metrics: metrics, httpOpts: options.HTTPClientOptions, + enableScrapeRuleEval: options.EnableScrapeRules, } sp.newLoop = func(opts scrapeLoopOptions) loop { // Update the targets retrieval function for metadata to a new scrape cache. @@ -169,6 +183,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed } opts.target.SetMetadataStore(cache) + var re RuleEngine + if sp.enableScrapeRuleEval { + re = newRuleEngine(cfg.RuleConfigs, queryEngine) + } else { + re = &nopRuleEngine{} + } + return newScrapeLoop( ctx, opts.scraper, @@ -179,6 +200,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, func(ctx context.Context) storage.Appender { return app.Appender(ctx) }, + re, cache, sp.symbolTable, offsetSeed, @@ -893,6 +915,7 @@ type scrapeLoop struct { l *slog.Logger scrapeFailureLogger FailureLogger scrapeFailureLoggerMtx sync.RWMutex + ruleEngine RuleEngine cache *scrapeCache lastScrapeSize int buffers *pool.Pool @@ -1200,13 +1223,15 @@ func (c *scrapeCache) LengthMetadata() int { return len(c.metadata) } -func newScrapeLoop(ctx context.Context, +func newScrapeLoop( + ctx context.Context, sc scraper, l *slog.Logger, buffers *pool.Pool, sampleMutator labelsMutator, reportSampleMutator labelsMutator, appender func(ctx context.Context) storage.Appender, + re RuleEngine, cache *scrapeCache, symbolTable *labels.SymbolTable, offsetSeed uint64, @@ -1256,6 +1281,7 @@ func newScrapeLoop(ctx context.Context, sl := &scrapeLoop{ scraper: sc, buffers: buffers, + ruleEngine: re, cache: cache, appender: appender, symbolTable: symbolTable, @@ -1648,6 +1674,9 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, sl.cache.iterDone(true) }() + // Make a new batch for evaluating scrape rules + scrapeBatch := sl.ruleEngine.NewScrapeBatch() + loop: for { var ( @@ -1700,15 +1729,11 @@ loop: t = *parsedTimestamp } - if sl.cache.getDropped(met) { - continue - } - ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met) var ( ref storage.SeriesRef hash uint64 ) - + ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met) if seriesCached { ref = ce.ref lset = ce.lset @@ -1716,31 +1741,44 @@ loop: } else { p.Labels(&lset) hash = lset.Hash() - - // Hash label set as it is seen local to the target. Then add target labels - // and relabeling and store the final label set. - lset = sl.sampleMutator(lset) - - // The label set may be set to empty to indicate dropping. - if lset.IsEmpty() { - sl.cache.addDropped(met) - continue + } + if isHistogram && sl.enableNativeHistogramIngestion { + if h != nil { + scrapeBatch.AddHistogramSample(lset, t, h.ToFloat(nil)) + } else { + scrapeBatch.AddHistogramSample(lset, t, fh) } + } else { + scrapeBatch.AddFloatSample(lset, t, val) + } - if !lset.Has(labels.MetricName) { - err = errNameLabelMandatory - break loop - } - if !lset.IsValid(sl.validationScheme) { - err = fmt.Errorf("invalid metric name or label names: %s", lset.String()) - break loop - } + if sl.cache.getDropped(met) { + continue + } - // If any label limits is exceeded the scrape should fail. - if err = verifyLabelLimits(lset, sl.labelLimits); err != nil { - sl.metrics.targetScrapePoolExceededLabelLimits.Inc() - break loop - } + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + lset = sl.sampleMutator(lset) + + // The label set may be set to empty to indicate dropping. + if lset.IsEmpty() { + sl.cache.addDropped(met) + continue + } + + if !lset.Has(labels.MetricName) { + err = errNameLabelMandatory + break loop + } + if !lset.IsValid(sl.validationScheme) { + err = fmt.Errorf("invalid metric name or label names: %s", lset.String()) + break loop + } + + // If any label limits is exceeded the scrape should fail. + if err = verifyLabelLimits(lset, sl.labelLimits); err != nil { + sl.metrics.targetScrapePoolExceededLabelLimits.Inc() + break loop } if seriesAlreadyScraped && parsedTimestamp == nil { @@ -1802,8 +1840,8 @@ loop: } // Increment added even if there's an error so we correctly report the - // number of samples remaining after relabeling. - // We still report duplicated samples here since this number should be the exact number + // number of floats remaining after relabeling. + // We still report duplicated floats here since this number should be the exact number // of time series exposed on a scrape after relabelling. added++ exemplars = exemplars[:0] // Reset and reuse the exemplar slice. @@ -1889,9 +1927,41 @@ loop: if appErrs.numExemplarOutOfOrder > 0 { sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) } - if err == nil { - err = sl.updateStaleMarkers(app, defTime) + if err != nil { + return } + if err = sl.updateStaleMarkers(app, defTime); err != nil { + return + } + + ruleSamples, ruleErr := sl.ruleEngine.EvaluateRules(scrapeBatch, ts, sl.sampleMutator) + if ruleErr != nil { + err = ruleErr + return + } + var recordBuf []byte + for _, s := range ruleSamples { + recordBuf = recordBuf[:0] + added++ + var ( + ce *cacheEntry + ok bool + ) + recordBytes := s.metric.Bytes(recordBuf) + ce, ok, _ = sl.cache.get(recordBytes) + if ok { + _, err = app.Append(ce.ref, s.metric, s.t, s.f) + if err != nil { + return + } + } else { + var ref storage.SeriesRef + ref, err = app.Append(0, s.metric, s.t, s.f) + sl.cache.addRef(recordBytes, ref, s.metric, s.metric.Hash()) + seriesAdded++ + } + } + return } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 6bb140cca8..0e09003d82 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -34,6 +34,8 @@ import ( "text/template" "time" + "github.com/prometheus/prometheus/promql" + "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/grafana/regexp" @@ -84,7 +86,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) ) a, ok := sp.appendable.(*nopAppendable) @@ -339,7 +341,7 @@ func TestDroppedTargetsList(t *testing.T) { }, }, } - sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}" expectedLength = 2 ) @@ -509,7 +511,7 @@ func TestScrapePoolReload(t *testing.T) { mtx.Lock() targetScraper := opts.scraper.(*targetScraper) - require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper) + require.True(t, stopped[targetScraper.hash()], "Scrape loop for %f not stopped yet", targetScraper) mtx.Unlock() } return l @@ -778,7 +780,7 @@ func TestScrapePoolTargetLimit(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ := newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) loop := sp.newLoop(scrapeLoopOptions{ target: &Target{}, @@ -851,7 +853,7 @@ func TestScrapePoolRaces(t *testing.T) { newConfig := func() *config.ScrapeConfig { return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} } - sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{ @@ -937,35 +939,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app } func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop { - return newScrapeLoop(ctx, - scraper, - nil, nil, - nopMutator, - nopMutator, - app, - nil, - labels.NewSymbolTable(), - 0, - true, - false, - true, - 0, 0, histogram.ExponentialSchemaMax, - nil, - interval, - time.Hour, - false, - false, - false, - false, - false, - true, - nil, - false, - newTestScrapeMetrics(t), - false, - model.LegacyValidation, - fallback, - ) + return newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, interval, time.Hour, false, false, false, false, false, true, nil, false, newTestScrapeMetrics(t), false, model.LegacyValidation, fallback) } func TestScrapeLoopStopBeforeRun(t *testing.T) { @@ -1083,35 +1057,7 @@ func TestScrapeLoopRun(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, - scraper, - nil, nil, - nopMutator, - nopMutator, - app, - nil, - nil, - 0, - true, - false, - true, - 0, 0, histogram.ExponentialSchemaMax, - nil, - time.Second, - time.Hour, - false, - false, - false, - false, - false, - false, - nil, - false, - scrapeMetrics, - false, - model.LegacyValidation, - "", - ) + sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, nil, 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, time.Second, time.Hour, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "") // The loop must terminate during the initial offset if the context // is canceled. @@ -1230,35 +1176,7 @@ func TestScrapeLoopMetadata(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, - scraper, - nil, nil, - nopMutator, - nopMutator, - func(_ context.Context) storage.Appender { return nopAppender{} }, - cache, - labels.NewSymbolTable(), - 0, - true, - false, - true, - 0, 0, histogram.ExponentialSchemaMax, - nil, - 0, - 0, - false, - false, - false, - false, - false, - false, - nil, - false, - scrapeMetrics, - false, - model.LegacyValidation, - "", - ) + sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, func(_ context.Context) storage.Appender { return nopAppender{} }, nopRuleEngine{}, cache, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, 0, 0, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "") defer cancel() slApp := sl.appender(ctx) @@ -3382,7 +3300,7 @@ func TestReuseScrapeCache(t *testing.T) { ScrapeInterval: model.Duration(5 * time.Second), MetricsPath: "/metrics", } - sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) t1 = &Target{ labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"), scrapeConfig: &config.ScrapeConfig{}, @@ -3567,7 +3485,7 @@ func TestReuseCacheRace(t *testing.T) { MetricsPath: "/metrics", } buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) - sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t)) + sp, _ = newScrapePool(cfg, app, 0, nil, buffers, nil, &Options{}, newTestScrapeMetrics(t)) t1 = &Target{ labels: labels.FromStrings("labelNew", "nameNew"), scrapeConfig: &config.ScrapeConfig{}, @@ -3668,7 +3586,7 @@ func TestScrapeReportLimit(t *testing.T) { ts, scrapedTwice := newScrapableServer("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n") defer ts.Close() - sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -3724,7 +3642,7 @@ func TestScrapeUTF8(t *testing.T) { ts, scrapedTwice := newScrapableServer("{\"with.dots\"} 42\n") defer ts.Close() - sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -3871,7 +3789,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) { }, }, } - sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}}, @@ -3953,7 +3871,7 @@ test_summary_count 199 ts, scrapedTwice := newScrapableServer(metricsText) defer ts.Close() - sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -4297,7 +4215,7 @@ metric: < foundLeValues[v] = true } - require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %v but found %v", expectedValues, foundLeValues) + require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %f but found %f", expectedValues, foundLeValues) for _, v := range expectedValues { require.Contains(t, foundLeValues, v, "label value not found") } @@ -4455,7 +4373,7 @@ metric: < })) defer ts.Close() - sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -4603,7 +4521,7 @@ func TestScrapeLoopCompression(t *testing.T) { EnableCompression: tc.enableCompression, } - sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) defer sp.stop() @@ -4885,7 +4803,7 @@ scrape_configs: s.DB.EnableNativeHistograms() reg := prometheus.NewRegistry() - mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg) + mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg, nil) require.NoError(t, err) cfg, err := config.Load(configStr, promslog.NewNopLogger()) require.NoError(t, err) @@ -4928,7 +4846,7 @@ scrape_configs: it := series.Iterator(nil) for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { if vt != chunkenc.ValHistogram { - // don't care about other samples + // don't care about other floats continue } _, h := it.AtHistogram(nil) @@ -4987,7 +4905,7 @@ func TestTargetScrapeConfigWithLabels(t *testing.T) { } } - sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) t.Cleanup(sp.stop) @@ -5138,7 +5056,7 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) { }, } - p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) + p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) t.Cleanup(p.stop) @@ -5153,3 +5071,224 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) { <-time.After(1 * time.Second) } + +func TestScrapeWithScrapeRules(t *testing.T) { + ts := time.Now() + + tests := []struct { + name string + scrape string + discoveryLabels []string + scrapeRules []*config.ScrapeRuleConfig + relabelConfig []*relabel.Config + expectedFloats []floatSample + }{ + { + name: "scrape rules without relabeling", + scrape: ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5`, + discoveryLabels: []string{"instance", "local"}, + scrapeRules: []*config.ScrapeRuleConfig{ + { + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + }, + }, + relabelConfig: nil, + expectedFloats: []floatSample{ + { + metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "1"), + t: ts.UnixMilli(), + f: 3, + }, + { + metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "2"), + t: ts.UnixMilli(), + f: 5, + }, + { + metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"), + t: ts.UnixMilli(), + f: 8, + }, + }, + }, + { + name: "scrape rules with relabeling on original series", + scrape: ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5`, + discoveryLabels: []string{"instance", "local"}, + scrapeRules: []*config.ScrapeRuleConfig{ + { + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + }, + }, + relabelConfig: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("metric"), + Action: relabel.Drop, + }, + }, + expectedFloats: []floatSample{ + { + metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"), + t: ts.UnixMilli(), + f: 8, + }, + }, + }, + { + name: "scrape rules with relabeling on recorded series", + scrape: ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5`, + discoveryLabels: []string{"instance", "local"}, + scrapeRules: []*config.ScrapeRuleConfig{ + { + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + }, + }, + relabelConfig: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("l1:metric:sum"), + Action: relabel.Keep, + }, + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("l1:metric:sum"), + Action: relabel.Replace, + TargetLabel: "__name__", + Replacement: "relabeled_rule", + }, + }, + expectedFloats: []floatSample{ + { + metric: labels.FromStrings("__name__", "relabeled_rule", "instance", "local", "l1", "1"), + t: ts.UnixMilli(), + f: 8, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var ( + app = &collectResultAppender{} + re = newRuleEngine(test.scrapeRules, promql.NewEngine( + promql.EngineOpts{ + MaxSamples: 500000, + Timeout: 30 * time.Second, + }), + ) + target = &Target{ + labels: labels.FromStrings(test.discoveryLabels...), + } + ) + sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { return app }, 0) + sl.sampleMutator = func(l labels.Labels) labels.Labels { + return mutateSampleLabels(l, target, false, test.relabelConfig) + } + sl.reportSampleMutator = func(l labels.Labels) labels.Labels { + return mutateReportSampleLabels(l, target) + } + sl.ruleEngine = re + + slApp := sl.appender(context.Background()) + _, _, _, err := sl.append(slApp, []byte(test.scrape), "text/plain", ts) + require.NoError(t, err) + + require.NoError(t, slApp.Commit()) + require.Len(t, app.resultFloats, len(test.expectedFloats)) + for i := range app.resultFloats { + expected, actual := test.expectedFloats[i], app.resultFloats[i] + require.Equal(t, expected.metric.String(), actual.metric.String()) + require.Equal(t, expected.t, actual.t) + require.Equal(t, expected.f, actual.f) + } + }) + } +} + +func TestScrapeReportWithScrapeRules(t *testing.T) { + ts := time.Now() + + scrapeRule := config.ScrapeRuleConfig{ + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + } + + scrape := ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5` + + _, sl := simpleTestScrapeLoop(t) + sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine( + promql.EngineOpts{ + MaxSamples: 500000, + Timeout: 30 * time.Second, + }, + )) + + slApp := sl.appender(context.Background()) + scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 3, added) + require.Equal(t, 3, seriesAdded) + + scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 3, added) + require.Equal(t, 0, seriesAdded) +} + +func TestScrapeReportWithScrapeRulesAndRelabeling(t *testing.T) { + ts := time.Now() + + scrapeRule := config.ScrapeRuleConfig{ + Expr: "sum by (l1) (metric)", + Record: "l1:metric:sum", + } + + scrape := ` +metric{l1="1", l2="1"} 3 +metric{l1="1", l2="2"} 5` + + _, sl := simpleTestScrapeLoop(t) + sl.sampleMutator = func(l labels.Labels) labels.Labels { + return mutateSampleLabels(l, &Target{}, false, []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("metric"), + Action: relabel.Drop, + }, + }) + } + sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine( + promql.EngineOpts{ + MaxSamples: 500000, + Timeout: 30 * time.Second, + }, + )) + + slApp := sl.appender(context.Background()) + scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 1, added) + require.Equal(t, 1, seriesAdded) + + scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts) + require.NoError(t, err) + require.Equal(t, 2, scraped) + require.Equal(t, 1, added) + require.Equal(t, 0, seriesAdded) +}