Implement rule evaluations

This commit extends Prometheus with the option to run aggregation rules
during scrape time, before relabeling takes place.

All rules are executed independently for each target in the context of
individual scrapes. Newly calculated samples are
committed to TSDB atomically with all samples from that scrape.

Metric relabeling can also be applied on series recorded from scrape rules.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
This commit is contained in:
Filip Petkovski 2025-03-05 12:15:09 +01:00
parent 9e5d59b777
commit 2ccea83913
No known key found for this signature in database
GPG key ID: 88DE577D81202846
10 changed files with 791 additions and 173 deletions

View file

@ -270,6 +270,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "promql-delayed-name-removal":
c.promqlEnableDelayedNameRemoval = true
logger.Info("Experimental PromQL delayed name removal enabled.")
case "scrape-rules":
c.scrape.EnableScrapeRules = true
logger.Info("Experimental scrape-time recording rules enabled.")
case "":
continue
case "old-ui":
@ -744,12 +747,30 @@ func main() {
os.Exit(1)
}
opts := promql.EngineOpts{
Logger: logger.With("component", "query engine"),
Reg: prometheus.DefaultRegisterer,
MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout),
ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
// EnableAtModifier and EnableNegativeOffset have to be
// always on for regular PromQL as of Prometheus v2.33.
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: cfg.enablePerStepStats,
EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval,
}
queryEngine := promql.NewEngine(opts)
scrapeManager, err := scrape.NewManager(
&cfg.scrape,
logger.With("component", "scrape manager"),
logging.NewJSONFileLogger,
fanoutStorage,
prometheus.DefaultRegisterer,
queryEngine,
)
if err != nil {
logger.Error("failed to create a scrape manager", "err", err)
@ -758,9 +779,7 @@ func main() {
var (
tracingManager = tracing.NewManager(logger)
queryEngine *promql.Engine
ruleManager *rules.Manager
ruleManager *rules.Manager
)
if cfg.maxprocsEnable {
@ -787,24 +806,6 @@ func main() {
}
if !agentMode {
opts := promql.EngineOpts{
Logger: logger.With("component", "query engine"),
Reg: prometheus.DefaultRegisterer,
MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout),
ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
// EnableAtModifier and EnableNegativeOffset have to be
// always on for regular PromQL as of Prometheus v2.33.
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: cfg.enablePerStepStats,
EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval,
}
queryEngine = promql.NewEngine(opts)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,

View file

@ -16,6 +16,7 @@ package config
import (
"errors"
"fmt"
"github.com/prometheus/prometheus/promql/parser"
"log/slog"
"mime"
"net/url"
@ -732,6 +733,8 @@ type ScrapeConfig struct {
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// List of metric relabel configurations.
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"`
// List of rules to execute at scrape time.
RuleConfigs []*ScrapeRuleConfig `yaml:"scrape_rule_configs,omitempty"`
}
// SetDirectory joins any relative file paths with dir.
@ -858,6 +861,38 @@ func (c *ScrapeConfig) MarshalYAML() (interface{}, error) {
return discovery.MarshalYAMLWithInlineConfigs(c)
}
// ScrapeRuleConfig is the configuration for rules executed
// at scrape time for each individual target.
type ScrapeRuleConfig struct {
Expr string `yaml:"expr"`
Record string `yaml:"record"`
}
func (a *ScrapeRuleConfig) Validate() error {
if a.Record == "" {
return errors.New("aggregation rule record must not be empty")
}
if a.Expr == "" {
return errors.New("aggregation rule expression must not be empty")
}
expr, err := parser.ParseExpr(a.Expr)
if err != nil {
return fmt.Errorf("invalid scrape rule expression: %w", err)
}
parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error {
if _, ok := node.(*parser.MatrixSelector); ok {
err = errors.New("matrix selectors are not allowed in scrape rule expressions")
return err
}
return nil
})
return err
}
// StorageConfig configures runtime reloadable configuration options.
type StorageConfig struct {
TSDBConfig *TSDBConfig `yaml:"tsdb,omitempty"`

View file

@ -16,6 +16,7 @@ package config
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
@ -2566,3 +2567,40 @@ func TestGetScrapeConfigs_Loaded(t *testing.T) {
require.NoError(t, err)
})
}
func TestScrapeRuleConfigs(t *testing.T) {
tests := []struct {
name string
ruleConfig ScrapeRuleConfig
err error
}{
{
name: "valid scrape rule config",
ruleConfig: ScrapeRuleConfig{
Expr: "sum by (label) (metric)",
Record: "sum:metric:label",
},
err: nil,
},
{
name: "matrix selector in record expression",
ruleConfig: ScrapeRuleConfig{
Expr: "sum by (label) (rate(metric[2m]))",
Record: "sum:metric:label",
},
err: errors.New("matrix selectors are not allowed in scrape rule expressions"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.ruleConfig.Validate()
if test.err == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, test.err.Error(), err.Error())
}
})
}
}

View file

@ -419,6 +419,11 @@ static_configs:
relabel_configs:
[ - <relabel_config> ... ]
# List of rules that are executed on samples from each scrape.
# Each rule is evaluated independently and is executed after relabel_configs and before metric_relabel_configs.
scrape_rule_configs:
[ - <scrape_rule_config> ... ]
# List of metric relabel configurations.
metric_relabel_configs:
[ - <relabel_config> ... ]
@ -2607,6 +2612,28 @@ anchored on both ends. To un-anchor the regex, use `.*<regex>.*`.
Care must be taken with `labeldrop` and `labelkeep` to ensure that metrics are
still uniquely labeled once the labels are removed.
### `<scrape_rule_configs>`
Scrape rules are applied after target relabeling and before metric relabeling. They can be used to evaluate PromQL expressions against samples from individual scrapes.
Rules are only evaluated in the context of single scrapes and do not take into account samples from different targets, nor samples that are already ingested in Prometheus.
Metric relabelings can be applied to series generated by scrape rules.
A potential use of scrape rules is to aggregate away expensive series before they are ingested in Prometheus.
If the source series are not needed, they can be dropped using metric relabeling rules.
### `<scrape_rule_config>`
The syntax for a scrape rules is as follows:
```yaml
# The name of the time series to output to. Must be a valid metric name.
record: <string>
# The PromQL expression to evaluate.
expr: <string>
```
### `<metric_relabel_configs>`
Metric relabeling is applied to samples as the last step before ingestion. It

View file

@ -22,6 +22,8 @@ import (
"sync"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -37,7 +39,14 @@ import (
)
// NewManager is the Manager constructor.
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
func NewManager(
o *Options,
logger *slog.Logger,
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error),
app storage.Appendable,
registerer prometheus.Registerer,
queryEngine *promql.Engine,
) (*Manager, error) {
if o == nil {
o = &Options{}
}
@ -61,6 +70,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
triggerReload: make(chan struct{}, 1),
metrics: sm,
buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }),
queryEngine: queryEngine,
}
m.metrics.setTargetMetadataCacheGatherer(m)
@ -90,6 +100,9 @@ type Options struct {
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
// EnableScrapeRules enables rule evaluation at scrape time.
EnableScrapeRules bool
// private option for testability.
skipOffsetting bool
}
@ -111,6 +124,7 @@ type Manager struct {
targetSets map[string][]*targetgroup.Group
buffers *pool.Pool
queryEngine *promql.Engine
triggerReload chan struct{}
metrics *scrapeMetrics
@ -182,7 +196,7 @@ func (m *Manager) reload() {
continue
}
m.metrics.targetScrapePools.Inc()
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.queryEngine, m.opts, m.metrics)
if err != nil {
m.metrics.targetScrapePoolsFailed.Inc()
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)

View file

@ -521,7 +521,7 @@ scrape_configs:
)
opts := Options{}
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
require.NoError(t, err)
newLoop := func(scrapeLoopOptions) loop {
ch <- struct{}{}
@ -586,7 +586,7 @@ scrape_configs:
func TestManagerTargetsUpdates(t *testing.T) {
opts := Options{}
testRegistry := prometheus.NewRegistry()
m, err := NewManager(&opts, nil, nil, nil, testRegistry)
m, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
require.NoError(t, err)
ts := make(chan map[string][]*targetgroup.Group)
@ -639,7 +639,7 @@ global:
opts := Options{}
testRegistry := prometheus.NewRegistry()
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
require.NoError(t, err)
// Load the first config.
@ -716,7 +716,7 @@ scrape_configs:
}
opts := Options{}
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
require.NoError(t, err)
reload(scrapeManager, cfg1)
@ -1055,7 +1055,7 @@ func TestUnregisterMetrics(t *testing.T) {
// Check that all metrics can be unregistered, allowing a second manager to be created.
for i := 0; i < 2; i++ {
opts := Options{}
manager, err := NewManager(&opts, nil, nil, nil, reg)
manager, err := NewManager(&opts, nil, nil, nil, reg, nil)
require.NotNil(t, manager)
require.NoError(t, err)
// Unregister all metrics.
@ -1104,13 +1104,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
sdMetrics,
discovery.Updatert(100*time.Millisecond),
)
scrapeManager, err := NewManager(
opts,
nil,
nil,
app,
prometheus.NewRegistry(),
)
scrapeManager, err := NewManager(opts, nil, nil, app, prometheus.NewRegistry(), nil)
require.NoError(t, err)
go discoveryManager.Run()
go scrapeManager.Run(discoveryManager.SyncCh())

231
scrape/rules.go Normal file
View file

@ -0,0 +1,231 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"context"
"time"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
)
type RuleEngine interface {
NewScrapeBatch() Batch
EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error)
}
// ruleEngine evaluates rules from individual targets at scrape time.
type ruleEngine struct {
rules []*config.ScrapeRuleConfig
engine *promql.Engine
}
// newRuleEngine creates a new RuleEngine.
func newRuleEngine(
rules []*config.ScrapeRuleConfig,
queryEngine *promql.Engine,
) RuleEngine {
if len(rules) == 0 {
return &nopRuleEngine{}
}
return &ruleEngine{
rules: rules,
engine: queryEngine,
}
}
// NewScrapeBatch creates a new Batch which can be used to add samples from a single scrape.
// Rules are always evaluated on a single Batch.
func (r *ruleEngine) NewScrapeBatch() Batch {
return &batch{
samples: make([]Sample, 0),
}
}
// EvaluateRules executes rules on the given Batch and returns new Samples.
func (r *ruleEngine) EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) {
var (
result []Sample
builder labels.ScratchBuilder
)
for _, rule := range r.rules {
queryable := storage.QueryableFunc(func(_, _ int64) (storage.Querier, error) {
return b, nil
})
query, err := r.engine.NewInstantQuery(context.Background(), queryable, nil, rule.Expr, ts)
if err != nil {
return nil, err
}
samples, err := query.Exec(context.Background()).Vector()
if err != nil {
return nil, err
}
for _, s := range samples {
builder.Reset()
metricNameSet := false
s.Metric.Range(func(lbl labels.Label) {
if lbl.Name == labels.MetricName {
metricNameSet = true
builder.Add(labels.MetricName, rule.Record)
} else {
builder.Add(lbl.Name, lbl.Value)
}
})
if !metricNameSet {
builder.Add(labels.MetricName, rule.Record)
}
builder.Sort()
result = append(result, Sample{
metric: sampleMutator(builder.Labels()),
t: s.T,
f: s.F,
fh: s.H,
})
}
}
return result, nil
}
// Batch is used to collect floats from a single scrape.
type Batch interface {
storage.Querier
AddFloatSample(labels.Labels, int64, float64)
AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram)
}
type batch struct {
samples []Sample
}
type Sample struct {
metric labels.Labels
t int64
f float64
fh *histogram.FloatHistogram
}
func (b *batch) AddFloatSample(lbls labels.Labels, t int64, f float64) {
b.samples = append(b.samples, Sample{
metric: lbls,
t: t,
f: f,
})
}
func (b *batch) AddHistogramSample(lbls labels.Labels, t int64, fh *histogram.FloatHistogram) {
b.samples = append(b.samples, Sample{
metric: lbls,
t: t,
fh: fh,
})
}
func (b *batch) Select(_ context.Context, _ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
var samples []Sample
for _, s := range b.samples {
match := true
for _, matcher := range matchers {
if !matcher.Matches(s.metric.Get(matcher.Name)) {
match = false
break
}
}
if match {
samples = append(samples, s)
}
}
return &seriesSet{
i: -1,
samples: samples,
}
}
func (b *batch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (b *batch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (b *batch) Close() error { return nil }
type seriesSet struct {
i int
samples []Sample
}
func (s *seriesSet) Next() bool {
s.i++
return s.i != len(s.samples)
}
func (s *seriesSet) At() storage.Series {
sample := s.samples[s.i]
if sample.fh != nil {
return promql.NewStorageSeries(promql.Series{
Metric: sample.metric,
Histograms: []promql.HPoint{{T: sample.t, H: sample.fh}},
})
} else {
return promql.NewStorageSeries(promql.Series{
Metric: sample.metric,
Floats: []promql.FPoint{{T: sample.t, F: sample.f}},
})
}
}
func (s *seriesSet) Err() error { return nil }
func (s *seriesSet) Warnings() annotations.Annotations { return nil }
// nopRuleEngine does not produce any new floats when evaluating rules.
type nopRuleEngine struct{}
func (n nopRuleEngine) NewScrapeBatch() Batch { return &nopBatch{} }
func (n nopRuleEngine) EvaluateRules(Batch, time.Time, labelsMutator) ([]Sample, error) {
return nil, nil
}
type nopBatch struct{}
func (b *nopBatch) AddFloatSample(labels.Labels, int64, float64) {}
func (b *nopBatch) AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram) {
}
func (b *nopBatch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (b *nopBatch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (b *nopBatch) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return nil
}
func (b *nopBatch) Close() error { return nil }

75
scrape/rules_test.go Normal file
View file

@ -0,0 +1,75 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"testing"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
)
func TestRuleEngine(t *testing.T) {
now := time.Now()
samples := []Sample{
{
metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/"),
t: now.UnixMilli(),
f: 10,
},
{
metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/metrics"),
t: now.UnixMilli(),
f: 6,
},
}
rules := []*config.ScrapeRuleConfig{
{
Expr: "sum by (code) (http_requests_total)",
Record: "code:http_requests_total:sum",
},
}
engine := promql.NewEngine(promql.EngineOpts{
MaxSamples: 50000000,
Timeout: 10 * time.Second,
LookbackDelta: 5 * time.Minute,
})
re := newRuleEngine(rules, engine)
b := re.NewScrapeBatch()
for _, s := range samples {
b.AddFloatSample(s.metric, s.t, s.f)
}
result, err := re.EvaluateRules(b, now, nopMutator)
require.NoError(t, err)
if len(result) != 1 {
t.Fatalf("Invalid sample count, got %d, want %d", len(result), 3)
}
expectedSamples := []Sample{
{
metric: labels.FromStrings("__name__", "code:http_requests_total:sum", "code", "200"),
t: now.UnixMilli(),
f: 16,
},
}
require.ElementsMatch(t, expectedSamples, result)
}

View file

@ -31,6 +31,8 @@ import (
"time"
"unsafe"
"github.com/prometheus/prometheus/promql"
"github.com/klauspost/compress/gzip"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -102,6 +104,8 @@ type scrapePool struct {
scrapeFailureLogger FailureLogger
scrapeFailureLoggerMtx sync.RWMutex
enableScrapeRuleEval bool
}
type labelLimits struct {
@ -137,7 +141,16 @@ const maxAheadTime = 10 * time.Minute
// returning an empty label set is interpreted as "drop".
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
func newScrapePool(
cfg *config.ScrapeConfig,
app storage.Appendable,
offsetSeed uint64,
logger *slog.Logger,
buffers *pool.Pool,
queryEngine *promql.Engine,
options *Options,
metrics *scrapeMetrics,
) (*scrapePool, error) {
if logger == nil {
logger = promslog.NewNopLogger()
}
@ -160,6 +173,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
logger: logger,
metrics: metrics,
httpOpts: options.HTTPClientOptions,
enableScrapeRuleEval: options.EnableScrapeRules,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
@ -169,6 +183,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
}
opts.target.SetMetadataStore(cache)
var re RuleEngine
if sp.enableScrapeRuleEval {
re = newRuleEngine(cfg.RuleConfigs, queryEngine)
} else {
re = &nopRuleEngine{}
}
return newScrapeLoop(
ctx,
opts.scraper,
@ -179,6 +200,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func(ctx context.Context) storage.Appender { return app.Appender(ctx) },
re,
cache,
sp.symbolTable,
offsetSeed,
@ -893,6 +915,7 @@ type scrapeLoop struct {
l *slog.Logger
scrapeFailureLogger FailureLogger
scrapeFailureLoggerMtx sync.RWMutex
ruleEngine RuleEngine
cache *scrapeCache
lastScrapeSize int
buffers *pool.Pool
@ -1200,13 +1223,15 @@ func (c *scrapeCache) LengthMetadata() int {
return len(c.metadata)
}
func newScrapeLoop(ctx context.Context,
func newScrapeLoop(
ctx context.Context,
sc scraper,
l *slog.Logger,
buffers *pool.Pool,
sampleMutator labelsMutator,
reportSampleMutator labelsMutator,
appender func(ctx context.Context) storage.Appender,
re RuleEngine,
cache *scrapeCache,
symbolTable *labels.SymbolTable,
offsetSeed uint64,
@ -1256,6 +1281,7 @@ func newScrapeLoop(ctx context.Context,
sl := &scrapeLoop{
scraper: sc,
buffers: buffers,
ruleEngine: re,
cache: cache,
appender: appender,
symbolTable: symbolTable,
@ -1648,6 +1674,9 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
sl.cache.iterDone(true)
}()
// Make a new batch for evaluating scrape rules
scrapeBatch := sl.ruleEngine.NewScrapeBatch()
loop:
for {
var (
@ -1700,15 +1729,11 @@ loop:
t = *parsedTimestamp
}
if sl.cache.getDropped(met) {
continue
}
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
var (
ref storage.SeriesRef
hash uint64
)
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
if seriesCached {
ref = ce.ref
lset = ce.lset
@ -1716,31 +1741,44 @@ loop:
} else {
p.Labels(&lset)
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
// The label set may be set to empty to indicate dropping.
if lset.IsEmpty() {
sl.cache.addDropped(met)
continue
}
if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
scrapeBatch.AddHistogramSample(lset, t, h.ToFloat(nil))
} else {
scrapeBatch.AddHistogramSample(lset, t, fh)
}
} else {
scrapeBatch.AddFloatSample(lset, t, val)
}
if !lset.Has(labels.MetricName) {
err = errNameLabelMandatory
break loop
}
if !lset.IsValid(sl.validationScheme) {
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
break loop
}
if sl.cache.getDropped(met) {
continue
}
// If any label limits is exceeded the scrape should fail.
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
break loop
}
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
// The label set may be set to empty to indicate dropping.
if lset.IsEmpty() {
sl.cache.addDropped(met)
continue
}
if !lset.Has(labels.MetricName) {
err = errNameLabelMandatory
break loop
}
if !lset.IsValid(sl.validationScheme) {
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
break loop
}
// If any label limits is exceeded the scrape should fail.
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
break loop
}
if seriesAlreadyScraped && parsedTimestamp == nil {
@ -1802,8 +1840,8 @@ loop:
}
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
// We still report duplicated samples here since this number should be the exact number
// number of floats remaining after relabeling.
// We still report duplicated floats here since this number should be the exact number
// of time series exposed on a scrape after relabelling.
added++
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
@ -1889,9 +1927,41 @@ loop:
if appErrs.numExemplarOutOfOrder > 0 {
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if err == nil {
err = sl.updateStaleMarkers(app, defTime)
if err != nil {
return
}
if err = sl.updateStaleMarkers(app, defTime); err != nil {
return
}
ruleSamples, ruleErr := sl.ruleEngine.EvaluateRules(scrapeBatch, ts, sl.sampleMutator)
if ruleErr != nil {
err = ruleErr
return
}
var recordBuf []byte
for _, s := range ruleSamples {
recordBuf = recordBuf[:0]
added++
var (
ce *cacheEntry
ok bool
)
recordBytes := s.metric.Bytes(recordBuf)
ce, ok, _ = sl.cache.get(recordBytes)
if ok {
_, err = app.Append(ce.ref, s.metric, s.t, s.f)
if err != nil {
return
}
} else {
var ref storage.SeriesRef
ref, err = app.Append(0, s.metric, s.t, s.f)
sl.cache.addRef(recordBytes, ref, s.metric, s.metric.Hash())
seriesAdded++
}
}
return
}

View file

@ -34,6 +34,8 @@ import (
"text/template"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/grafana/regexp"
@ -84,7 +86,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
)
a, ok := sp.appendable.(*nopAppendable)
@ -339,7 +341,7 @@ func TestDroppedTargetsList(t *testing.T) {
},
},
}
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
expectedLength = 2
)
@ -509,7 +511,7 @@ func TestScrapePoolReload(t *testing.T) {
mtx.Lock()
targetScraper := opts.scraper.(*targetScraper)
require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper)
require.True(t, stopped[targetScraper.hash()], "Scrape loop for %f not stopped yet", targetScraper)
mtx.Unlock()
}
return l
@ -778,7 +780,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
loop := sp.newLoop(scrapeLoopOptions{
target: &Target{},
@ -851,7 +853,7 @@ func TestScrapePoolRaces(t *testing.T) {
newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
}
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{
@ -937,35 +939,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
}
func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop {
return newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
labels.NewSymbolTable(),
0,
true,
false,
true,
0, 0, histogram.ExponentialSchemaMax,
nil,
interval,
time.Hour,
false,
false,
false,
false,
false,
true,
nil,
false,
newTestScrapeMetrics(t),
false,
model.LegacyValidation,
fallback,
)
return newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, interval, time.Hour, false, false, false, false, false, true, nil, false, newTestScrapeMetrics(t), false, model.LegacyValidation, fallback)
}
func TestScrapeLoopStopBeforeRun(t *testing.T) {
@ -1083,35 +1057,7 @@ func TestScrapeLoopRun(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
nil,
0,
true,
false,
true,
0, 0, histogram.ExponentialSchemaMax,
nil,
time.Second,
time.Hour,
false,
false,
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
false,
model.LegacyValidation,
"",
)
sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, nil, 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, time.Second, time.Hour, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "")
// The loop must terminate during the initial offset if the context
// is canceled.
@ -1230,35 +1176,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
func(_ context.Context) storage.Appender { return nopAppender{} },
cache,
labels.NewSymbolTable(),
0,
true,
false,
true,
0, 0, histogram.ExponentialSchemaMax,
nil,
0,
0,
false,
false,
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
false,
model.LegacyValidation,
"",
)
sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, func(_ context.Context) storage.Appender { return nopAppender{} }, nopRuleEngine{}, cache, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, 0, 0, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "")
defer cancel()
slApp := sl.appender(ctx)
@ -3382,7 +3300,7 @@ func TestReuseScrapeCache(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
}
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
scrapeConfig: &config.ScrapeConfig{},
@ -3567,7 +3485,7 @@ func TestReuseCacheRace(t *testing.T) {
MetricsPath: "/metrics",
}
buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, nil, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
labels: labels.FromStrings("labelNew", "nameNew"),
scrapeConfig: &config.ScrapeConfig{},
@ -3668,7 +3586,7 @@ func TestScrapeReportLimit(t *testing.T) {
ts, scrapedTwice := newScrapableServer("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -3724,7 +3642,7 @@ func TestScrapeUTF8(t *testing.T) {
ts, scrapedTwice := newScrapableServer("{\"with.dots\"} 42\n")
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -3871,7 +3789,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
},
},
}
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
@ -3953,7 +3871,7 @@ test_summary_count 199
ts, scrapedTwice := newScrapableServer(metricsText)
defer ts.Close()
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4297,7 +4215,7 @@ metric: <
foundLeValues[v] = true
}
require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %v but found %v", expectedValues, foundLeValues)
require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %f but found %f", expectedValues, foundLeValues)
for _, v := range expectedValues {
require.Contains(t, foundLeValues, v, "label value not found")
}
@ -4455,7 +4373,7 @@ metric: <
}))
defer ts.Close()
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t))
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4603,7 +4521,7 @@ func TestScrapeLoopCompression(t *testing.T) {
EnableCompression: tc.enableCompression,
}
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4885,7 +4803,7 @@ scrape_configs:
s.DB.EnableNativeHistograms()
reg := prometheus.NewRegistry()
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg)
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg, nil)
require.NoError(t, err)
cfg, err := config.Load(configStr, promslog.NewNopLogger())
require.NoError(t, err)
@ -4928,7 +4846,7 @@ scrape_configs:
it := series.Iterator(nil)
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
if vt != chunkenc.ValHistogram {
// don't care about other samples
// don't care about other floats
continue
}
_, h := it.AtHistogram(nil)
@ -4987,7 +4905,7 @@ func TestTargetScrapeConfigWithLabels(t *testing.T) {
}
}
sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
t.Cleanup(sp.stop)
@ -5138,7 +5056,7 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) {
},
}
p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
t.Cleanup(p.stop)
@ -5153,3 +5071,218 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) {
<-time.After(1 * time.Second)
}
func TestScrapeWithScrapeRules(t *testing.T) {
ts := time.Now()
tests := []struct {
name string
scrape string
discoveryLabels []string
scrapeRules []*config.ScrapeRuleConfig
relabelConfig []*relabel.Config
expectedFloats []floatSample
}{
{
name: "scrape rules without relabeling",
scrape: `
metric{l1="1", l2="1"} 3
metric{l1="1", l2="2"} 5`,
discoveryLabels: []string{"instance", "local"},
scrapeRules: []*config.ScrapeRuleConfig{
{
Expr: "sum by (l1) (metric)",
Record: "l1:metric:sum",
},
},
relabelConfig: nil,
expectedFloats: []floatSample{
{
metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "1"),
t: ts.UnixMilli(),
f: 3,
},
{
metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "2"),
t: ts.UnixMilli(),
f: 5,
},
{
metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"),
t: ts.UnixMilli(),
f: 8,
},
},
},
{
name: "scrape rules with relabeling on original series",
scrape: `
metric{l1="1", l2="1"} 3
metric{l1="1", l2="2"} 5`,
discoveryLabels: []string{"instance", "local"},
scrapeRules: []*config.ScrapeRuleConfig{
{
Expr: "sum by (l1) (metric)",
Record: "l1:metric:sum",
},
},
relabelConfig: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("metric"),
Action: relabel.Drop,
},
},
expectedFloats: []floatSample{
{
metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"),
t: ts.UnixMilli(),
f: 8,
},
},
},
{
name: "scrape rules with relabeling on recorded series",
scrape: `
metric{l1="1", l2="1"} 3
metric{l1="1", l2="2"} 5`,
discoveryLabels: []string{"instance", "local"},
scrapeRules: []*config.ScrapeRuleConfig{
{
Expr: "sum by (l1) (metric)",
Record: "l1:metric:sum",
},
},
relabelConfig: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("l1:metric:sum"),
Action: relabel.Keep,
},
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("l1:metric:sum"),
Action: relabel.Replace,
TargetLabel: "__name__",
Replacement: "relabeled_rule",
},
},
expectedFloats: []floatSample{
{
metric: labels.FromStrings("__name__", "relabeled_rule", "instance", "local", "l1", "1"),
t: ts.UnixMilli(),
f: 8,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
app = &collectResultAppender{}
re = newRuleEngine(test.scrapeRules, promql.NewEngine(
promql.EngineOpts{
MaxSamples: 500000,
Timeout: 30 * time.Second,
}),
)
target = &Target{
labels: labels.FromStrings(test.discoveryLabels...),
}
)
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { return app }, 0)
sl.sampleMutator = func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, target, false, test.relabelConfig)
}
sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
return mutateReportSampleLabels(l, target)
}
sl.ruleEngine = re
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(test.scrape), "text/plain", ts)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, test.expectedFloats, app.resultFloats)
})
}
}
func TestScrapeReportWithScrapeRules(t *testing.T) {
ts := time.Now()
scrapeRule := config.ScrapeRuleConfig{
Expr: "sum by (l1) (metric)",
Record: "l1:metric:sum",
}
scrape := `
metric{l1="1", l2="1"} 3
metric{l1="1", l2="2"} 5`
_, sl := simpleTestScrapeLoop(t)
sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine(
promql.EngineOpts{
MaxSamples: 500000,
Timeout: 30 * time.Second,
},
))
slApp := sl.appender(context.Background())
scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts)
require.NoError(t, err)
require.Equal(t, 2, scraped)
require.Equal(t, 3, added)
require.Equal(t, 3, seriesAdded)
scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts)
require.NoError(t, err)
require.Equal(t, 2, scraped)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
}
func TestScrapeReportWithScrapeRulesAndRelabeling(t *testing.T) {
ts := time.Now()
scrapeRule := config.ScrapeRuleConfig{
Expr: "sum by (l1) (metric)",
Record: "l1:metric:sum",
}
scrape := `
metric{l1="1", l2="1"} 3
metric{l1="1", l2="2"} 5`
_, sl := simpleTestScrapeLoop(t)
sl.sampleMutator = func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, &Target{}, false, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("metric"),
Action: relabel.Drop,
},
})
}
sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine(
promql.EngineOpts{
MaxSamples: 500000,
Timeout: 30 * time.Second,
},
))
slApp := sl.appender(context.Background())
scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts)
require.NoError(t, err)
require.Equal(t, 2, scraped)
require.Equal(t, 1, added)
require.Equal(t, 1, seriesAdded)
scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts)
require.NoError(t, err)
require.Equal(t, 2, scraped)
require.Equal(t, 1, added)
require.Equal(t, 0, seriesAdded)
}