mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge de887c5fe1
into 61aa82865d
This commit is contained in:
commit
0bc7c7549e
|
@ -270,6 +270,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
|||
case "promql-delayed-name-removal":
|
||||
c.promqlEnableDelayedNameRemoval = true
|
||||
logger.Info("Experimental PromQL delayed name removal enabled.")
|
||||
case "scrape-rules":
|
||||
c.scrape.EnableScrapeRules = true
|
||||
logger.Info("Experimental scrape-time recording rules enabled.")
|
||||
case "":
|
||||
continue
|
||||
case "old-ui":
|
||||
|
@ -744,12 +747,30 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: logger.With("component", "query engine"),
|
||||
Reg: prometheus.DefaultRegisterer,
|
||||
MaxSamples: cfg.queryMaxSamples,
|
||||
Timeout: time.Duration(cfg.queryTimeout),
|
||||
ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")),
|
||||
LookbackDelta: time.Duration(cfg.lookbackDelta),
|
||||
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
|
||||
// EnableAtModifier and EnableNegativeOffset have to be
|
||||
// always on for regular PromQL as of Prometheus v2.33.
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
EnablePerStepStats: cfg.enablePerStepStats,
|
||||
EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval,
|
||||
}
|
||||
queryEngine := promql.NewEngine(opts)
|
||||
|
||||
scrapeManager, err := scrape.NewManager(
|
||||
&cfg.scrape,
|
||||
logger.With("component", "scrape manager"),
|
||||
logging.NewJSONFileLogger,
|
||||
fanoutStorage,
|
||||
prometheus.DefaultRegisterer,
|
||||
queryEngine,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error("failed to create a scrape manager", "err", err)
|
||||
|
@ -758,9 +779,7 @@ func main() {
|
|||
|
||||
var (
|
||||
tracingManager = tracing.NewManager(logger)
|
||||
|
||||
queryEngine *promql.Engine
|
||||
ruleManager *rules.Manager
|
||||
ruleManager *rules.Manager
|
||||
)
|
||||
|
||||
if cfg.maxprocsEnable {
|
||||
|
@ -787,24 +806,6 @@ func main() {
|
|||
}
|
||||
|
||||
if !agentMode {
|
||||
opts := promql.EngineOpts{
|
||||
Logger: logger.With("component", "query engine"),
|
||||
Reg: prometheus.DefaultRegisterer,
|
||||
MaxSamples: cfg.queryMaxSamples,
|
||||
Timeout: time.Duration(cfg.queryTimeout),
|
||||
ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")),
|
||||
LookbackDelta: time.Duration(cfg.lookbackDelta),
|
||||
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
|
||||
// EnableAtModifier and EnableNegativeOffset have to be
|
||||
// always on for regular PromQL as of Prometheus v2.33.
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
EnablePerStepStats: cfg.enablePerStepStats,
|
||||
EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval,
|
||||
}
|
||||
|
||||
queryEngine = promql.NewEngine(opts)
|
||||
|
||||
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
||||
Appendable: fanoutStorage,
|
||||
Queryable: localStorage,
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/prometheus/prometheus/discovery"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/relabel"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage/remote/azuread"
|
||||
"github.com/prometheus/prometheus/storage/remote/googleiam"
|
||||
)
|
||||
|
@ -732,6 +733,8 @@ type ScrapeConfig struct {
|
|||
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
|
||||
// List of metric relabel configurations.
|
||||
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"`
|
||||
// List of rules to execute at scrape time.
|
||||
RuleConfigs []*ScrapeRuleConfig `yaml:"scrape_rule_configs,omitempty"`
|
||||
}
|
||||
|
||||
// SetDirectory joins any relative file paths with dir.
|
||||
|
@ -858,6 +861,38 @@ func (c *ScrapeConfig) MarshalYAML() (interface{}, error) {
|
|||
return discovery.MarshalYAMLWithInlineConfigs(c)
|
||||
}
|
||||
|
||||
// ScrapeRuleConfig is the configuration for rules executed
|
||||
// at scrape time for each individual target.
|
||||
type ScrapeRuleConfig struct {
|
||||
Expr string `yaml:"expr"`
|
||||
Record string `yaml:"record"`
|
||||
}
|
||||
|
||||
func (a *ScrapeRuleConfig) Validate() error {
|
||||
if a.Record == "" {
|
||||
return errors.New("aggregation rule record must not be empty")
|
||||
}
|
||||
|
||||
if a.Expr == "" {
|
||||
return errors.New("aggregation rule expression must not be empty")
|
||||
}
|
||||
|
||||
expr, err := parser.ParseExpr(a.Expr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid scrape rule expression: %w", err)
|
||||
}
|
||||
|
||||
parser.Inspect(expr, func(node parser.Node, _ []parser.Node) error {
|
||||
if _, ok := node.(*parser.MatrixSelector); ok {
|
||||
err = errors.New("matrix selectors are not allowed in scrape rule expressions")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// StorageConfig configures runtime reloadable configuration options.
|
||||
type StorageConfig struct {
|
||||
TSDBConfig *TSDBConfig `yaml:"tsdb,omitempty"`
|
||||
|
|
|
@ -16,6 +16,7 @@ package config
|
|||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -2566,3 +2567,40 @@ func TestGetScrapeConfigs_Loaded(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestScrapeRuleConfigs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ruleConfig ScrapeRuleConfig
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "valid scrape rule config",
|
||||
ruleConfig: ScrapeRuleConfig{
|
||||
Expr: "sum by (label) (metric)",
|
||||
Record: "sum:metric:label",
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "matrix selector in record expression",
|
||||
ruleConfig: ScrapeRuleConfig{
|
||||
Expr: "sum by (label) (rate(metric[2m]))",
|
||||
Record: "sum:metric:label",
|
||||
},
|
||||
err: errors.New("matrix selectors are not allowed in scrape rule expressions"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
err := test.ruleConfig.Validate()
|
||||
if test.err == nil {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, test.err.Error(), err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -419,6 +419,11 @@ static_configs:
|
|||
relabel_configs:
|
||||
[ - <relabel_config> ... ]
|
||||
|
||||
# List of rules that are executed on samples from each scrape.
|
||||
# Each rule is evaluated independently and is executed after relabel_configs and before metric_relabel_configs.
|
||||
scrape_rule_configs:
|
||||
[ - <scrape_rule_config> ... ]
|
||||
|
||||
# List of metric relabel configurations.
|
||||
metric_relabel_configs:
|
||||
[ - <relabel_config> ... ]
|
||||
|
@ -2607,6 +2612,28 @@ anchored on both ends. To un-anchor the regex, use `.*<regex>.*`.
|
|||
Care must be taken with `labeldrop` and `labelkeep` to ensure that metrics are
|
||||
still uniquely labeled once the labels are removed.
|
||||
|
||||
### `<scrape_rule_configs>`
|
||||
|
||||
Scrape rules are applied after target relabeling and before metric relabeling. They can be used to evaluate PromQL expressions against samples from individual scrapes.
|
||||
|
||||
Rules are only evaluated in the context of single scrapes and do not take into account samples from different targets, nor samples that are already ingested in Prometheus.
|
||||
|
||||
Metric relabelings can be applied to series generated by scrape rules.
|
||||
|
||||
A potential use of scrape rules is to aggregate away expensive series before they are ingested in Prometheus.
|
||||
If the source series are not needed, they can be dropped using metric relabeling rules.
|
||||
|
||||
### `<scrape_rule_config>`
|
||||
|
||||
The syntax for a scrape rules is as follows:
|
||||
|
||||
```yaml
|
||||
# The name of the time series to output to. Must be a valid metric name.
|
||||
record: <string>
|
||||
# The PromQL expression to evaluate.
|
||||
expr: <string>
|
||||
```
|
||||
|
||||
### `<metric_relabel_configs>`
|
||||
|
||||
Metric relabeling is applied to samples as the last step before ingestion. It
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -37,7 +39,14 @@ import (
|
|||
)
|
||||
|
||||
// NewManager is the Manager constructor.
|
||||
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
|
||||
func NewManager(
|
||||
o *Options,
|
||||
logger *slog.Logger,
|
||||
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error),
|
||||
app storage.Appendable,
|
||||
registerer prometheus.Registerer,
|
||||
queryEngine *promql.Engine,
|
||||
) (*Manager, error) {
|
||||
if o == nil {
|
||||
o = &Options{}
|
||||
}
|
||||
|
@ -61,6 +70,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
|
|||
triggerReload: make(chan struct{}, 1),
|
||||
metrics: sm,
|
||||
buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }),
|
||||
queryEngine: queryEngine,
|
||||
}
|
||||
|
||||
m.metrics.setTargetMetadataCacheGatherer(m)
|
||||
|
@ -90,6 +100,9 @@ type Options struct {
|
|||
// Optional HTTP client options to use when scraping.
|
||||
HTTPClientOptions []config_util.HTTPClientOption
|
||||
|
||||
// EnableScrapeRules enables rule evaluation at scrape time.
|
||||
EnableScrapeRules bool
|
||||
|
||||
// private option for testability.
|
||||
skipOffsetting bool
|
||||
}
|
||||
|
@ -111,6 +124,7 @@ type Manager struct {
|
|||
targetSets map[string][]*targetgroup.Group
|
||||
buffers *pool.Pool
|
||||
|
||||
queryEngine *promql.Engine
|
||||
triggerReload chan struct{}
|
||||
|
||||
metrics *scrapeMetrics
|
||||
|
@ -182,7 +196,7 @@ func (m *Manager) reload() {
|
|||
continue
|
||||
}
|
||||
m.metrics.targetScrapePools.Inc()
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.queryEngine, m.opts, m.metrics)
|
||||
if err != nil {
|
||||
m.metrics.targetScrapePoolsFailed.Inc()
|
||||
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)
|
||||
|
|
|
@ -521,7 +521,7 @@ scrape_configs:
|
|||
)
|
||||
|
||||
opts := Options{}
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
|
||||
require.NoError(t, err)
|
||||
newLoop := func(scrapeLoopOptions) loop {
|
||||
ch <- struct{}{}
|
||||
|
@ -586,7 +586,7 @@ scrape_configs:
|
|||
func TestManagerTargetsUpdates(t *testing.T) {
|
||||
opts := Options{}
|
||||
testRegistry := prometheus.NewRegistry()
|
||||
m, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
m, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
ts := make(chan map[string][]*targetgroup.Group)
|
||||
|
@ -639,7 +639,7 @@ global:
|
|||
|
||||
opts := Options{}
|
||||
testRegistry := prometheus.NewRegistry()
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Load the first config.
|
||||
|
@ -716,7 +716,7 @@ scrape_configs:
|
|||
}
|
||||
|
||||
opts := Options{}
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
reload(scrapeManager, cfg1)
|
||||
|
@ -1055,7 +1055,7 @@ func TestUnregisterMetrics(t *testing.T) {
|
|||
// Check that all metrics can be unregistered, allowing a second manager to be created.
|
||||
for i := 0; i < 2; i++ {
|
||||
opts := Options{}
|
||||
manager, err := NewManager(&opts, nil, nil, nil, reg)
|
||||
manager, err := NewManager(&opts, nil, nil, nil, reg, nil)
|
||||
require.NotNil(t, manager)
|
||||
require.NoError(t, err)
|
||||
// Unregister all metrics.
|
||||
|
@ -1104,13 +1104,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
|
|||
sdMetrics,
|
||||
discovery.Updatert(100*time.Millisecond),
|
||||
)
|
||||
scrapeManager, err := NewManager(
|
||||
opts,
|
||||
nil,
|
||||
nil,
|
||||
app,
|
||||
prometheus.NewRegistry(),
|
||||
)
|
||||
scrapeManager, err := NewManager(opts, nil, nil, app, prometheus.NewRegistry(), nil)
|
||||
require.NoError(t, err)
|
||||
go discoveryManager.Run()
|
||||
go scrapeManager.Run(discoveryManager.SyncCh())
|
||||
|
|
231
scrape/rules.go
Normal file
231
scrape/rules.go
Normal file
|
@ -0,0 +1,231 @@
|
|||
// Copyright 2022 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package scrape
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
type RuleEngine interface {
|
||||
NewScrapeBatch() Batch
|
||||
EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error)
|
||||
}
|
||||
|
||||
// ruleEngine evaluates rules from individual targets at scrape time.
|
||||
type ruleEngine struct {
|
||||
rules []*config.ScrapeRuleConfig
|
||||
engine *promql.Engine
|
||||
}
|
||||
|
||||
// newRuleEngine creates a new RuleEngine.
|
||||
func newRuleEngine(
|
||||
rules []*config.ScrapeRuleConfig,
|
||||
queryEngine *promql.Engine,
|
||||
) RuleEngine {
|
||||
if len(rules) == 0 {
|
||||
return &nopRuleEngine{}
|
||||
}
|
||||
|
||||
return &ruleEngine{
|
||||
rules: rules,
|
||||
engine: queryEngine,
|
||||
}
|
||||
}
|
||||
|
||||
// NewScrapeBatch creates a new Batch which can be used to add samples from a single scrape.
|
||||
// Rules are always evaluated on a single Batch.
|
||||
func (r *ruleEngine) NewScrapeBatch() Batch {
|
||||
return &batch{
|
||||
samples: make([]Sample, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// EvaluateRules executes rules on the given Batch and returns new Samples.
|
||||
func (r *ruleEngine) EvaluateRules(b Batch, ts time.Time, sampleMutator labelsMutator) ([]Sample, error) {
|
||||
var (
|
||||
result []Sample
|
||||
builder = labels.NewScratchBuilder(0)
|
||||
)
|
||||
for _, rule := range r.rules {
|
||||
queryable := storage.QueryableFunc(func(_, _ int64) (storage.Querier, error) {
|
||||
return b, nil
|
||||
})
|
||||
|
||||
query, err := r.engine.NewInstantQuery(context.Background(), queryable, nil, rule.Expr, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
samples, err := query.Exec(context.Background()).Vector()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, s := range samples {
|
||||
builder.Reset()
|
||||
metricNameSet := false
|
||||
s.Metric.Range(func(lbl labels.Label) {
|
||||
if lbl.Name == labels.MetricName {
|
||||
metricNameSet = true
|
||||
builder.Add(labels.MetricName, rule.Record)
|
||||
} else {
|
||||
builder.Add(lbl.Name, lbl.Value)
|
||||
}
|
||||
})
|
||||
if !metricNameSet {
|
||||
builder.Add(labels.MetricName, rule.Record)
|
||||
}
|
||||
builder.Sort()
|
||||
result = append(result, Sample{
|
||||
metric: sampleMutator(builder.Labels()),
|
||||
t: s.T,
|
||||
f: s.F,
|
||||
fh: s.H,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Batch is used to collect floats from a single scrape.
|
||||
type Batch interface {
|
||||
storage.Querier
|
||||
AddFloatSample(labels.Labels, int64, float64)
|
||||
AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram)
|
||||
}
|
||||
|
||||
type batch struct {
|
||||
samples []Sample
|
||||
}
|
||||
|
||||
type Sample struct {
|
||||
metric labels.Labels
|
||||
t int64
|
||||
f float64
|
||||
fh *histogram.FloatHistogram
|
||||
}
|
||||
|
||||
func (b *batch) AddFloatSample(lbls labels.Labels, t int64, f float64) {
|
||||
b.samples = append(b.samples, Sample{
|
||||
metric: lbls,
|
||||
t: t,
|
||||
f: f,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *batch) AddHistogramSample(lbls labels.Labels, t int64, fh *histogram.FloatHistogram) {
|
||||
b.samples = append(b.samples, Sample{
|
||||
metric: lbls,
|
||||
t: t,
|
||||
fh: fh,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *batch) Select(_ context.Context, _ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
var samples []Sample
|
||||
for _, s := range b.samples {
|
||||
match := true
|
||||
for _, matcher := range matchers {
|
||||
if !matcher.Matches(s.metric.Get(matcher.Name)) {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
samples = append(samples, s)
|
||||
}
|
||||
}
|
||||
|
||||
return &seriesSet{
|
||||
i: -1,
|
||||
samples: samples,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *batch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (b *batch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (b *batch) Close() error { return nil }
|
||||
|
||||
type seriesSet struct {
|
||||
i int
|
||||
samples []Sample
|
||||
}
|
||||
|
||||
func (s *seriesSet) Next() bool {
|
||||
s.i++
|
||||
return s.i != len(s.samples)
|
||||
}
|
||||
|
||||
func (s *seriesSet) At() storage.Series {
|
||||
sample := s.samples[s.i]
|
||||
if sample.fh != nil {
|
||||
return promql.NewStorageSeries(promql.Series{
|
||||
Metric: sample.metric,
|
||||
Histograms: []promql.HPoint{{T: sample.t, H: sample.fh}},
|
||||
})
|
||||
}
|
||||
|
||||
return promql.NewStorageSeries(promql.Series{
|
||||
Metric: sample.metric,
|
||||
Floats: []promql.FPoint{{T: sample.t, F: sample.f}},
|
||||
})
|
||||
}
|
||||
|
||||
func (s *seriesSet) Err() error { return nil }
|
||||
func (s *seriesSet) Warnings() annotations.Annotations { return nil }
|
||||
|
||||
// nopRuleEngine does not produce any new floats when evaluating rules.
|
||||
type nopRuleEngine struct{}
|
||||
|
||||
func (n nopRuleEngine) NewScrapeBatch() Batch { return &nopBatch{} }
|
||||
|
||||
func (n nopRuleEngine) EvaluateRules(Batch, time.Time, labelsMutator) ([]Sample, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type nopBatch struct{}
|
||||
|
||||
func (b *nopBatch) AddFloatSample(labels.Labels, int64, float64) {}
|
||||
|
||||
func (b *nopBatch) AddHistogramSample(labels.Labels, int64, *histogram.FloatHistogram) {
|
||||
}
|
||||
|
||||
func (b *nopBatch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (b *nopBatch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (b *nopBatch) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *nopBatch) Close() error { return nil }
|
75
scrape/rules_test.go
Normal file
75
scrape/rules_test.go
Normal file
|
@ -0,0 +1,75 @@
|
|||
// Copyright 2022 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package scrape
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
func TestRuleEngine(t *testing.T) {
|
||||
now := time.Now()
|
||||
samples := []Sample{
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/"),
|
||||
t: now.UnixMilli(),
|
||||
f: 10,
|
||||
},
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "http_requests_total", "code", "200", "handler", "/metrics"),
|
||||
t: now.UnixMilli(),
|
||||
f: 6,
|
||||
},
|
||||
}
|
||||
rules := []*config.ScrapeRuleConfig{
|
||||
{
|
||||
Expr: "sum by (code) (http_requests_total)",
|
||||
Record: "code:http_requests_total:sum",
|
||||
},
|
||||
}
|
||||
|
||||
engine := promql.NewEngine(promql.EngineOpts{
|
||||
MaxSamples: 50000000,
|
||||
Timeout: 10 * time.Second,
|
||||
LookbackDelta: 5 * time.Minute,
|
||||
})
|
||||
re := newRuleEngine(rules, engine)
|
||||
b := re.NewScrapeBatch()
|
||||
for _, s := range samples {
|
||||
b.AddFloatSample(s.metric, s.t, s.f)
|
||||
}
|
||||
|
||||
result, err := re.EvaluateRules(b, now, nopMutator)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(result) != 1 {
|
||||
t.Fatalf("Invalid sample count, got %d, want %d", len(result), 3)
|
||||
}
|
||||
|
||||
expectedSamples := []Sample{
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "code:http_requests_total:sum", "code", "200"),
|
||||
t: now.UnixMilli(),
|
||||
f: 16,
|
||||
},
|
||||
}
|
||||
require.ElementsMatch(t, expectedSamples, result)
|
||||
}
|
136
scrape/scrape.go
136
scrape/scrape.go
|
@ -31,6 +31,8 @@ import (
|
|||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"github.com/klauspost/compress/gzip"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -102,6 +104,8 @@ type scrapePool struct {
|
|||
|
||||
scrapeFailureLogger FailureLogger
|
||||
scrapeFailureLoggerMtx sync.RWMutex
|
||||
|
||||
enableScrapeRuleEval bool
|
||||
}
|
||||
|
||||
type labelLimits struct {
|
||||
|
@ -137,7 +141,16 @@ const maxAheadTime = 10 * time.Minute
|
|||
// returning an empty label set is interpreted as "drop".
|
||||
type labelsMutator func(labels.Labels) labels.Labels
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
|
||||
func newScrapePool(
|
||||
cfg *config.ScrapeConfig,
|
||||
app storage.Appendable,
|
||||
offsetSeed uint64,
|
||||
logger *slog.Logger,
|
||||
buffers *pool.Pool,
|
||||
queryEngine *promql.Engine,
|
||||
options *Options,
|
||||
metrics *scrapeMetrics,
|
||||
) (*scrapePool, error) {
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
}
|
||||
|
@ -160,6 +173,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||
logger: logger,
|
||||
metrics: metrics,
|
||||
httpOpts: options.HTTPClientOptions,
|
||||
enableScrapeRuleEval: options.EnableScrapeRules,
|
||||
}
|
||||
sp.newLoop = func(opts scrapeLoopOptions) loop {
|
||||
// Update the targets retrieval function for metadata to a new scrape cache.
|
||||
|
@ -169,6 +183,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||
}
|
||||
opts.target.SetMetadataStore(cache)
|
||||
|
||||
var re RuleEngine
|
||||
if sp.enableScrapeRuleEval {
|
||||
re = newRuleEngine(cfg.RuleConfigs, queryEngine)
|
||||
} else {
|
||||
re = &nopRuleEngine{}
|
||||
}
|
||||
|
||||
return newScrapeLoop(
|
||||
ctx,
|
||||
opts.scraper,
|
||||
|
@ -179,6 +200,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||
},
|
||||
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
|
||||
func(ctx context.Context) storage.Appender { return app.Appender(ctx) },
|
||||
re,
|
||||
cache,
|
||||
sp.symbolTable,
|
||||
offsetSeed,
|
||||
|
@ -893,6 +915,7 @@ type scrapeLoop struct {
|
|||
l *slog.Logger
|
||||
scrapeFailureLogger FailureLogger
|
||||
scrapeFailureLoggerMtx sync.RWMutex
|
||||
ruleEngine RuleEngine
|
||||
cache *scrapeCache
|
||||
lastScrapeSize int
|
||||
buffers *pool.Pool
|
||||
|
@ -1200,13 +1223,15 @@ func (c *scrapeCache) LengthMetadata() int {
|
|||
return len(c.metadata)
|
||||
}
|
||||
|
||||
func newScrapeLoop(ctx context.Context,
|
||||
func newScrapeLoop(
|
||||
ctx context.Context,
|
||||
sc scraper,
|
||||
l *slog.Logger,
|
||||
buffers *pool.Pool,
|
||||
sampleMutator labelsMutator,
|
||||
reportSampleMutator labelsMutator,
|
||||
appender func(ctx context.Context) storage.Appender,
|
||||
re RuleEngine,
|
||||
cache *scrapeCache,
|
||||
symbolTable *labels.SymbolTable,
|
||||
offsetSeed uint64,
|
||||
|
@ -1256,6 +1281,7 @@ func newScrapeLoop(ctx context.Context,
|
|||
sl := &scrapeLoop{
|
||||
scraper: sc,
|
||||
buffers: buffers,
|
||||
ruleEngine: re,
|
||||
cache: cache,
|
||||
appender: appender,
|
||||
symbolTable: symbolTable,
|
||||
|
@ -1648,6 +1674,9 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
|
|||
sl.cache.iterDone(true)
|
||||
}()
|
||||
|
||||
// Make a new batch for evaluating scrape rules
|
||||
scrapeBatch := sl.ruleEngine.NewScrapeBatch()
|
||||
|
||||
loop:
|
||||
for {
|
||||
var (
|
||||
|
@ -1700,15 +1729,11 @@ loop:
|
|||
t = *parsedTimestamp
|
||||
}
|
||||
|
||||
if sl.cache.getDropped(met) {
|
||||
continue
|
||||
}
|
||||
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
|
||||
var (
|
||||
ref storage.SeriesRef
|
||||
hash uint64
|
||||
)
|
||||
|
||||
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
|
||||
if seriesCached {
|
||||
ref = ce.ref
|
||||
lset = ce.lset
|
||||
|
@ -1716,31 +1741,44 @@ loop:
|
|||
} else {
|
||||
p.Labels(&lset)
|
||||
hash = lset.Hash()
|
||||
|
||||
// Hash label set as it is seen local to the target. Then add target labels
|
||||
// and relabeling and store the final label set.
|
||||
lset = sl.sampleMutator(lset)
|
||||
|
||||
// The label set may be set to empty to indicate dropping.
|
||||
if lset.IsEmpty() {
|
||||
sl.cache.addDropped(met)
|
||||
continue
|
||||
}
|
||||
if isHistogram && sl.enableNativeHistogramIngestion {
|
||||
if h != nil {
|
||||
scrapeBatch.AddHistogramSample(lset, t, h.ToFloat(nil))
|
||||
} else {
|
||||
scrapeBatch.AddHistogramSample(lset, t, fh)
|
||||
}
|
||||
} else {
|
||||
scrapeBatch.AddFloatSample(lset, t, val)
|
||||
}
|
||||
|
||||
if !lset.Has(labels.MetricName) {
|
||||
err = errNameLabelMandatory
|
||||
break loop
|
||||
}
|
||||
if !lset.IsValid(sl.validationScheme) {
|
||||
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
|
||||
break loop
|
||||
}
|
||||
if sl.cache.getDropped(met) {
|
||||
continue
|
||||
}
|
||||
|
||||
// If any label limits is exceeded the scrape should fail.
|
||||
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
|
||||
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
|
||||
break loop
|
||||
}
|
||||
// Hash label set as it is seen local to the target. Then add target labels
|
||||
// and relabeling and store the final label set.
|
||||
lset = sl.sampleMutator(lset)
|
||||
|
||||
// The label set may be set to empty to indicate dropping.
|
||||
if lset.IsEmpty() {
|
||||
sl.cache.addDropped(met)
|
||||
continue
|
||||
}
|
||||
|
||||
if !lset.Has(labels.MetricName) {
|
||||
err = errNameLabelMandatory
|
||||
break loop
|
||||
}
|
||||
if !lset.IsValid(sl.validationScheme) {
|
||||
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
|
||||
break loop
|
||||
}
|
||||
|
||||
// If any label limits is exceeded the scrape should fail.
|
||||
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
|
||||
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
|
||||
break loop
|
||||
}
|
||||
|
||||
if seriesAlreadyScraped && parsedTimestamp == nil {
|
||||
|
@ -1802,8 +1840,8 @@ loop:
|
|||
}
|
||||
|
||||
// Increment added even if there's an error so we correctly report the
|
||||
// number of samples remaining after relabeling.
|
||||
// We still report duplicated samples here since this number should be the exact number
|
||||
// number of floats remaining after relabeling.
|
||||
// We still report duplicated floats here since this number should be the exact number
|
||||
// of time series exposed on a scrape after relabelling.
|
||||
added++
|
||||
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
|
||||
|
@ -1889,9 +1927,41 @@ loop:
|
|||
if appErrs.numExemplarOutOfOrder > 0 {
|
||||
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
|
||||
}
|
||||
if err == nil {
|
||||
err = sl.updateStaleMarkers(app, defTime)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err = sl.updateStaleMarkers(app, defTime); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ruleSamples, ruleErr := sl.ruleEngine.EvaluateRules(scrapeBatch, ts, sl.sampleMutator)
|
||||
if ruleErr != nil {
|
||||
err = ruleErr
|
||||
return
|
||||
}
|
||||
var recordBuf []byte
|
||||
for _, s := range ruleSamples {
|
||||
recordBuf = recordBuf[:0]
|
||||
added++
|
||||
var (
|
||||
ce *cacheEntry
|
||||
ok bool
|
||||
)
|
||||
recordBytes := s.metric.Bytes(recordBuf)
|
||||
ce, ok, _ = sl.cache.get(recordBytes)
|
||||
if ok {
|
||||
_, err = app.Append(ce.ref, s.metric, s.t, s.f)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
var ref storage.SeriesRef
|
||||
ref, err = app.Append(0, s.metric, s.t, s.f)
|
||||
sl.cache.addRef(recordBytes, ref, s.metric, s.metric.Hash())
|
||||
seriesAdded++
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ import (
|
|||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/regexp"
|
||||
|
@ -84,7 +86,7 @@ func TestNewScrapePool(t *testing.T) {
|
|||
var (
|
||||
app = &nopAppendable{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
)
|
||||
|
||||
a, ok := sp.appendable.(*nopAppendable)
|
||||
|
@ -339,7 +341,7 @@ func TestDroppedTargetsList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
|
||||
expectedLength = 2
|
||||
)
|
||||
|
@ -509,7 +511,7 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
|
||||
mtx.Lock()
|
||||
targetScraper := opts.scraper.(*targetScraper)
|
||||
require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper)
|
||||
require.True(t, stopped[targetScraper.hash()], "Scrape loop for %f not stopped yet", targetScraper)
|
||||
mtx.Unlock()
|
||||
}
|
||||
return l
|
||||
|
@ -778,7 +780,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
|
|||
func TestScrapePoolAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{}
|
||||
app := &nopAppendable{}
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
|
||||
loop := sp.newLoop(scrapeLoopOptions{
|
||||
target: &Target{},
|
||||
|
@ -851,7 +853,7 @@ func TestScrapePoolRaces(t *testing.T) {
|
|||
newConfig := func() *config.ScrapeConfig {
|
||||
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
|
||||
}
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{
|
||||
|
@ -937,35 +939,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
|
|||
}
|
||||
|
||||
func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper scraper, app func(ctx context.Context) storage.Appender, interval time.Duration, fallback string) *scrapeLoop {
|
||||
return newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
app,
|
||||
nil,
|
||||
labels.NewSymbolTable(),
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
0, 0, histogram.ExponentialSchemaMax,
|
||||
nil,
|
||||
interval,
|
||||
time.Hour,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
nil,
|
||||
false,
|
||||
newTestScrapeMetrics(t),
|
||||
false,
|
||||
model.LegacyValidation,
|
||||
fallback,
|
||||
)
|
||||
return newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, interval, time.Hour, false, false, false, false, false, true, nil, false, newTestScrapeMetrics(t), false, model.LegacyValidation, fallback)
|
||||
}
|
||||
|
||||
func TestScrapeLoopStopBeforeRun(t *testing.T) {
|
||||
|
@ -1083,35 +1057,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
app,
|
||||
nil,
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
0, 0, histogram.ExponentialSchemaMax,
|
||||
nil,
|
||||
time.Second,
|
||||
time.Hour,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
false,
|
||||
scrapeMetrics,
|
||||
false,
|
||||
model.LegacyValidation,
|
||||
"",
|
||||
)
|
||||
sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, app, nopRuleEngine{}, nil, nil, 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, time.Second, time.Hour, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "")
|
||||
|
||||
// The loop must terminate during the initial offset if the context
|
||||
// is canceled.
|
||||
|
@ -1230,35 +1176,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
|
|||
defer close(signal)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func(_ context.Context) storage.Appender { return nopAppender{} },
|
||||
cache,
|
||||
labels.NewSymbolTable(),
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
0, 0, histogram.ExponentialSchemaMax,
|
||||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
false,
|
||||
scrapeMetrics,
|
||||
false,
|
||||
model.LegacyValidation,
|
||||
"",
|
||||
)
|
||||
sl := newScrapeLoop(ctx, scraper, nil, nil, nopMutator, nopMutator, func(_ context.Context) storage.Appender { return nopAppender{} }, nopRuleEngine{}, cache, labels.NewSymbolTable(), 0, true, false, true, 0, 0, histogram.ExponentialSchemaMax, nil, 0, 0, false, false, false, false, false, false, nil, false, scrapeMetrics, false, model.LegacyValidation, "")
|
||||
defer cancel()
|
||||
|
||||
slApp := sl.appender(ctx)
|
||||
|
@ -3382,7 +3300,7 @@ func TestReuseScrapeCache(t *testing.T) {
|
|||
ScrapeInterval: model.Duration(5 * time.Second),
|
||||
MetricsPath: "/metrics",
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
t1 = &Target{
|
||||
labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
|
||||
scrapeConfig: &config.ScrapeConfig{},
|
||||
|
@ -3567,7 +3485,7 @@ func TestReuseCacheRace(t *testing.T) {
|
|||
MetricsPath: "/metrics",
|
||||
}
|
||||
buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
t1 = &Target{
|
||||
labels: labels.FromStrings("labelNew", "nameNew"),
|
||||
scrapeConfig: &config.ScrapeConfig{},
|
||||
|
@ -3668,7 +3586,7 @@ func TestScrapeReportLimit(t *testing.T) {
|
|||
ts, scrapedTwice := newScrapableServer("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")
|
||||
defer ts.Close()
|
||||
|
||||
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -3724,7 +3642,7 @@ func TestScrapeUTF8(t *testing.T) {
|
|||
ts, scrapedTwice := newScrapableServer("{\"with.dots\"} 42\n")
|
||||
defer ts.Close()
|
||||
|
||||
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(cfg, s, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -3871,7 +3789,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
|
||||
|
@ -3953,7 +3871,7 @@ test_summary_count 199
|
|||
ts, scrapedTwice := newScrapableServer(metricsText)
|
||||
defer ts.Close()
|
||||
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -4297,7 +4215,7 @@ metric: <
|
|||
foundLeValues[v] = true
|
||||
}
|
||||
|
||||
require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %v but found %v", expectedValues, foundLeValues)
|
||||
require.Equal(t, len(expectedValues), len(foundLeValues), "unexpected number of label values, expected %f but found %f", expectedValues, foundLeValues)
|
||||
for _, v := range expectedValues {
|
||||
require.Contains(t, foundLeValues, v, "label value not found")
|
||||
}
|
||||
|
@ -4455,7 +4373,7 @@ metric: <
|
|||
}))
|
||||
defer ts.Close()
|
||||
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -4603,7 +4521,7 @@ func TestScrapeLoopCompression(t *testing.T) {
|
|||
EnableCompression: tc.enableCompression,
|
||||
}
|
||||
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
defer sp.stop()
|
||||
|
||||
|
@ -4885,7 +4803,7 @@ scrape_configs:
|
|||
s.DB.EnableNativeHistograms()
|
||||
reg := prometheus.NewRegistry()
|
||||
|
||||
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg)
|
||||
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond), EnableNativeHistogramsIngestion: true}, nil, nil, s, reg, nil)
|
||||
require.NoError(t, err)
|
||||
cfg, err := config.Load(configStr, promslog.NewNopLogger())
|
||||
require.NoError(t, err)
|
||||
|
@ -4928,7 +4846,7 @@ scrape_configs:
|
|||
it := series.Iterator(nil)
|
||||
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
|
||||
if vt != chunkenc.ValHistogram {
|
||||
// don't care about other samples
|
||||
// don't care about other floats
|
||||
continue
|
||||
}
|
||||
_, h := it.AtHistogram(nil)
|
||||
|
@ -4987,7 +4905,7 @@ func TestTargetScrapeConfigWithLabels(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
sp, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(sp.stop)
|
||||
|
||||
|
@ -5138,7 +5056,7 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
p, err := newScrapePool(cfg, &nopAppendable{}, 0, nil, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(p.stop)
|
||||
|
||||
|
@ -5153,3 +5071,224 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) {
|
|||
|
||||
<-time.After(1 * time.Second)
|
||||
}
|
||||
|
||||
func TestScrapeWithScrapeRules(t *testing.T) {
|
||||
ts := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
scrape string
|
||||
discoveryLabels []string
|
||||
scrapeRules []*config.ScrapeRuleConfig
|
||||
relabelConfig []*relabel.Config
|
||||
expectedFloats []floatSample
|
||||
}{
|
||||
{
|
||||
name: "scrape rules without relabeling",
|
||||
scrape: `
|
||||
metric{l1="1", l2="1"} 3
|
||||
metric{l1="1", l2="2"} 5`,
|
||||
discoveryLabels: []string{"instance", "local"},
|
||||
scrapeRules: []*config.ScrapeRuleConfig{
|
||||
{
|
||||
Expr: "sum by (l1) (metric)",
|
||||
Record: "l1:metric:sum",
|
||||
},
|
||||
},
|
||||
relabelConfig: nil,
|
||||
expectedFloats: []floatSample{
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "1"),
|
||||
t: ts.UnixMilli(),
|
||||
f: 3,
|
||||
},
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "metric", "instance", "local", "l1", "1", "l2", "2"),
|
||||
t: ts.UnixMilli(),
|
||||
f: 5,
|
||||
},
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"),
|
||||
t: ts.UnixMilli(),
|
||||
f: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "scrape rules with relabeling on original series",
|
||||
scrape: `
|
||||
metric{l1="1", l2="1"} 3
|
||||
metric{l1="1", l2="2"} 5`,
|
||||
discoveryLabels: []string{"instance", "local"},
|
||||
scrapeRules: []*config.ScrapeRuleConfig{
|
||||
{
|
||||
Expr: "sum by (l1) (metric)",
|
||||
Record: "l1:metric:sum",
|
||||
},
|
||||
},
|
||||
relabelConfig: []*relabel.Config{
|
||||
{
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: relabel.MustNewRegexp("metric"),
|
||||
Action: relabel.Drop,
|
||||
},
|
||||
},
|
||||
expectedFloats: []floatSample{
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "l1:metric:sum", "instance", "local", "l1", "1"),
|
||||
t: ts.UnixMilli(),
|
||||
f: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "scrape rules with relabeling on recorded series",
|
||||
scrape: `
|
||||
metric{l1="1", l2="1"} 3
|
||||
metric{l1="1", l2="2"} 5`,
|
||||
discoveryLabels: []string{"instance", "local"},
|
||||
scrapeRules: []*config.ScrapeRuleConfig{
|
||||
{
|
||||
Expr: "sum by (l1) (metric)",
|
||||
Record: "l1:metric:sum",
|
||||
},
|
||||
},
|
||||
relabelConfig: []*relabel.Config{
|
||||
{
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: relabel.MustNewRegexp("l1:metric:sum"),
|
||||
Action: relabel.Keep,
|
||||
},
|
||||
{
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: relabel.MustNewRegexp("l1:metric:sum"),
|
||||
Action: relabel.Replace,
|
||||
TargetLabel: "__name__",
|
||||
Replacement: "relabeled_rule",
|
||||
},
|
||||
},
|
||||
expectedFloats: []floatSample{
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "relabeled_rule", "instance", "local", "l1", "1"),
|
||||
t: ts.UnixMilli(),
|
||||
f: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
var (
|
||||
app = &collectResultAppender{}
|
||||
re = newRuleEngine(test.scrapeRules, promql.NewEngine(
|
||||
promql.EngineOpts{
|
||||
MaxSamples: 500000,
|
||||
Timeout: 30 * time.Second,
|
||||
}),
|
||||
)
|
||||
target = &Target{
|
||||
labels: labels.FromStrings(test.discoveryLabels...),
|
||||
}
|
||||
)
|
||||
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender { return app }, 0)
|
||||
sl.sampleMutator = func(l labels.Labels) labels.Labels {
|
||||
return mutateSampleLabels(l, target, false, test.relabelConfig)
|
||||
}
|
||||
sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
|
||||
return mutateReportSampleLabels(l, target)
|
||||
}
|
||||
sl.ruleEngine = re
|
||||
|
||||
slApp := sl.appender(context.Background())
|
||||
_, _, _, err := sl.append(slApp, []byte(test.scrape), "text/plain", ts)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, slApp.Commit())
|
||||
require.Len(t, app.resultFloats, len(test.expectedFloats))
|
||||
for i := range app.resultFloats {
|
||||
expected, actual := test.expectedFloats[i], app.resultFloats[i]
|
||||
require.Equal(t, expected.metric.String(), actual.metric.String())
|
||||
require.Equal(t, expected.t, actual.t)
|
||||
require.Equal(t, expected.f, actual.f)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapeReportWithScrapeRules(t *testing.T) {
|
||||
ts := time.Now()
|
||||
|
||||
scrapeRule := config.ScrapeRuleConfig{
|
||||
Expr: "sum by (l1) (metric)",
|
||||
Record: "l1:metric:sum",
|
||||
}
|
||||
|
||||
scrape := `
|
||||
metric{l1="1", l2="1"} 3
|
||||
metric{l1="1", l2="2"} 5`
|
||||
|
||||
_, sl := simpleTestScrapeLoop(t)
|
||||
sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine(
|
||||
promql.EngineOpts{
|
||||
MaxSamples: 500000,
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
))
|
||||
|
||||
slApp := sl.appender(context.Background())
|
||||
scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, scraped)
|
||||
require.Equal(t, 3, added)
|
||||
require.Equal(t, 3, seriesAdded)
|
||||
|
||||
scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, scraped)
|
||||
require.Equal(t, 3, added)
|
||||
require.Equal(t, 0, seriesAdded)
|
||||
}
|
||||
|
||||
func TestScrapeReportWithScrapeRulesAndRelabeling(t *testing.T) {
|
||||
ts := time.Now()
|
||||
|
||||
scrapeRule := config.ScrapeRuleConfig{
|
||||
Expr: "sum by (l1) (metric)",
|
||||
Record: "l1:metric:sum",
|
||||
}
|
||||
|
||||
scrape := `
|
||||
metric{l1="1", l2="1"} 3
|
||||
metric{l1="1", l2="2"} 5`
|
||||
|
||||
_, sl := simpleTestScrapeLoop(t)
|
||||
sl.sampleMutator = func(l labels.Labels) labels.Labels {
|
||||
return mutateSampleLabels(l, &Target{}, false, []*relabel.Config{
|
||||
{
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: relabel.MustNewRegexp("metric"),
|
||||
Action: relabel.Drop,
|
||||
},
|
||||
})
|
||||
}
|
||||
sl.ruleEngine = newRuleEngine([]*config.ScrapeRuleConfig{&scrapeRule}, promql.NewEngine(
|
||||
promql.EngineOpts{
|
||||
MaxSamples: 500000,
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
))
|
||||
|
||||
slApp := sl.appender(context.Background())
|
||||
scraped, added, seriesAdded, err := sl.append(slApp, []byte(scrape), "text/plain", ts)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, scraped)
|
||||
require.Equal(t, 1, added)
|
||||
require.Equal(t, 1, seriesAdded)
|
||||
|
||||
scraped, added, seriesAdded, err = sl.append(slApp, []byte(scrape), "text/plain", ts)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, scraped)
|
||||
require.Equal(t, 1, added)
|
||||
require.Equal(t, 0, seriesAdded)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue