feat(exemplars): exemplars support for recording rules

Signed-off-by: fwippe <frank.wippermueller@gmail.com>
This commit is contained in:
fwippe 2024-12-06 12:49:31 +01:00
parent 804ab49cfc
commit bcf2fcf01c
17 changed files with 1119 additions and 534 deletions

View file

@ -203,8 +203,9 @@ type flagConfig struct {
featureList []string
// These options are extracted from featureList
// for ease of use.
enablePerStepStats bool
enableConcurrentRuleEval bool
enablePerStepStats bool
enableConcurrentRuleEval bool
enableExemplarsInRecordingRules bool
prometheusURL string
corsRegexString string
@ -269,6 +270,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "promql-delayed-name-removal":
c.promqlEnableDelayedNameRemoval = true
logger.Info("Experimental PromQL delayed name removal enabled.")
case "exemplars-in-recording-rules":
c.enableExemplarsInRecordingRules = true
logger.Info("Experimental exemplars support in recording rules enabled.")
case "":
continue
case "old-ui":
@ -801,10 +805,16 @@ func main() {
queryEngine = promql.NewEngine(opts)
var exemplarQueryFunc rules.ExemplarQueryFunc
if cfg.enableExemplarsInRecordingRules {
exemplarQueryFunc = rules.ExemplarQuerierQueryFunc(localStorage)
}
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
ExemplarQueryFunc: exemplarQueryFunc,
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,

View file

@ -452,7 +452,7 @@ Outer:
lb, err := parser.ParseMetric(s.Labels)
var hist *histogram.FloatHistogram
if err == nil && s.Histogram != "" {
_, values, parseErr := parser.ParseSeriesDesc("{} " + s.Histogram)
_, values, _, parseErr := parser.ParseSeriesDesc("{} " + s.Histogram)
switch {
case parseErr != nil:
err = parseErr

View file

@ -151,3 +151,19 @@ Configuration reloads are triggered by detecting changes in the checksum of the
main configuration file or any referenced files, such as rule and scrape
configurations. To ensure consistency and avoid issues during reloads, it's
recommended to update these files atomically.
## Exemplars in Recording Rules
`--enable-feature=exemplars-in-recording-rules`
When enabled, Prometheus will include exemplars in the resulting time series generated by recording rules.
This is accomplished by matching the underlying exemplars of the referenced time series against the result labels
of the recording rule, before renaming labels according to the rule's configuration.
Note that this feature won't be able to match exemplars for certain label renaming scenarios, such as
1. When the recording rule renames labels via `label_replace` or `label_join` functions.
2. When the recording rule in turn references other recording rules that have renamed their labels as part of
their configuration.
Note that this feature is experimental and may change in the future.

View file

@ -39,6 +39,7 @@ import (
lblList []labels.Label
strings []string
series []SequenceValue
exemplars []Exemplar
histogram *histogram.FloatHistogram
descriptors map[string]interface{}
bucket_set []float64
@ -72,6 +73,7 @@ SEMICOLON
SPACE
STRING
TIMES
HASH
// Histogram Descriptors.
%token histogramDescStart
@ -180,6 +182,7 @@ START_METRIC_SELECTOR
%type <label> label_set_item
%type <strings> grouping_label_list grouping_labels maybe_grouping_labels
%type <series> series_item series_values
%type <exemplars> exemplar_item exemplars
%type <histogram> histogram_series_value
%type <descriptors> histogram_desc_map histogram_desc_item
%type <bucket_set> bucket_set bucket_set_list
@ -688,15 +691,34 @@ label_set_item : IDENTIFIER EQL STRING
* The syntax is described in https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#series
*/
series_description: metric series_values
series_description: metric series_values HASH SPACE exemplars
{
yylex.(*parser).generatedParserResult = &seriesDescription{
labels: $1,
values: $2,
labels: $1,
values: $2,
exemplars: $5,
}
}
| metric series_values
{
yylex.(*parser).generatedParserResult = &seriesDescription{
labels: $1,
values: $2,
}
}
;
exemplars : exemplar_item
{ $$ = $1 }
| exemplars SPACE HASH SPACE exemplar_item
{ $$ = append($1, $5...) }
| error
{ yylex.(*parser).unexpected("exemplars", ""); $$ = nil }
;
exemplar_item : label_set SPACE signed_or_unsigned_number SPACE uint
{ $$ = []Exemplar{{Labels: $1, Value: $3, Sequence: $5}}}
series_values : /*empty*/
{ $$ = []SequenceValue{} }
| series_values SPACE series_item

File diff suppressed because it is too large Load diff

View file

@ -181,6 +181,7 @@ var ItemTypeStr = map[ItemType]string{
BLANK: "_",
TIMES: "x",
SPACE: "<space>",
HASH: "#",
SUB: "-",
ADD: "+",
@ -282,6 +283,7 @@ type Lexer struct {
// series description variables for internal PromQL testing framework as well as in promtool rules unit tests.
// see https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#series
seriesDesc bool // Whether we are lexing a series description.
exemplars bool // Whether we are lexing exemplars (inside a series description).
histogramState histogramState // Determines whether or not inside of a histogram description.
}
@ -388,6 +390,9 @@ func lexStatements(l *Lexer) stateFn {
if l.braceOpen {
return lexInsideBraces
}
if l.exemplars {
return lexExemplars
}
if strings.HasPrefix(l.input[l.pos:], lineComment) {
return lexLineComment
}
@ -687,6 +692,9 @@ func lexInsideBraces(l *Lexer) stateFn {
l.braceOpen = false
if l.seriesDesc {
if l.exemplars {
return lexExemplars
}
return lexValueSequence
}
return lexStatements
@ -721,6 +729,10 @@ func lexValueSequence(l *Lexer) stateFn {
l.emit(SUB)
case r == 'x':
l.emit(TIMES)
case r == '#':
l.emit(HASH)
l.exemplars = true
return lexExemplars
case r == '_':
l.emit(BLANK)
case isDigit(r) || (r == '.' && isDigit(l.peek())):
@ -736,6 +748,39 @@ func lexValueSequence(l *Lexer) stateFn {
return lexValueSequence
}
// lexExemplars scans a sequence of exemplars as part of a series description.
func lexExemplars(l *Lexer) stateFn {
switch r := l.next(); {
case r == eof:
l.backup()
l.exemplars = false
return lexStatements
case r == '{':
l.emit(LEFT_BRACE)
l.braceOpen = true
return lexInsideBraces
case isSpace(r):
l.emit(SPACE)
lexSpace(l)
case r == '+':
l.emit(ADD)
case r == '-':
l.emit(SUB)
case r == '#':
l.emit(HASH)
case isDigit(r) || (r == '.' && isDigit(l.peek())):
l.backup()
lexNumber(l)
case isAlpha(r):
l.backup()
// We might lex invalid Items here but this will be caught by the parser.
return lexKeywordOrIdentifier
default:
return l.errorf("unexpected character in exemplars: %q", r)
}
return lexExemplars
}
// lexEscape scans a string escape sequence. The initial escaping character (\)
// has already been seen.
//
@ -1048,6 +1093,9 @@ Loop:
}
}
if l.seriesDesc && l.peek() != '{' {
if l.exemplars {
return lexExemplars
}
return lexValueSequence
}
return lexStatements

View file

@ -739,6 +739,28 @@ var tests = []struct {
},
seriesDesc: true,
},
{
input: `{} 1 # {foo="bar"} 1 1`,
expected: []Item{
{LEFT_BRACE, 0, `{`},
{RIGHT_BRACE, 1, `}`},
{SPACE, 2, ` `},
{NUMBER, 3, `1`},
{SPACE, 4, ` `},
{HASH, 5, `#`},
{SPACE, 6, ` `},
{LEFT_BRACE, 7, `{`},
{IDENTIFIER, 8, `foo`},
{EQL, 11, `=`},
{STRING, 12, `"bar"`},
{RIGHT_BRACE, 17, `}`},
{SPACE, 18, ` `},
{NUMBER, 19, `1`},
{SPACE, 20, ` `},
{NUMBER, 21, `1`},
},
seriesDesc: true,
},
},
},
{

View file

@ -239,14 +239,22 @@ func (v SequenceValue) String() string {
return fmt.Sprintf("%f", v.Value)
}
// Exemplar is an optional sample for a sequence of time series values.
type Exemplar struct {
Labels labels.Labels
Value float64
Sequence uint64
}
type seriesDescription struct {
labels labels.Labels
values []SequenceValue
labels labels.Labels
values []SequenceValue
exemplars []Exemplar
}
// ParseSeriesDesc parses the description of a time series. It is only used in
// the PromQL testing framework code.
func ParseSeriesDesc(input string) (labels labels.Labels, values []SequenceValue, err error) {
func ParseSeriesDesc(input string) (labels labels.Labels, values []SequenceValue, exemplars []Exemplar, err error) {
p := NewParser(input)
p.lex.seriesDesc = true
@ -259,13 +267,14 @@ func ParseSeriesDesc(input string) (labels labels.Labels, values []SequenceValue
labels = result.labels
values = result.values
exemplars = result.exemplars
}
if len(p.parseErrors) != 0 {
err = p.parseErrors
}
return labels, values, err
return labels, values, exemplars, err
}
// addParseErrf formats the error and appends it to the list of parsing errors.

View file

@ -4081,7 +4081,7 @@ func TestParseSeriesDesc(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
l, v, err := ParseSeriesDesc(tc.input)
l, v, _, err := ParseSeriesDesc(tc.input)
if tc.expectError != "" {
require.Contains(t, err.Error(), tc.expectError)
} else {
@ -4104,10 +4104,11 @@ func TestNaNExpression(t *testing.T) {
}
var testSeries = []struct {
input string
expectedMetric labels.Labels
expectedValues []SequenceValue
fail bool
input string
expectedMetric labels.Labels
expectedValues []SequenceValue
expectedExemplars []Exemplar
fail bool
}{
{
input: `{} 1 2 3`,
@ -4186,6 +4187,14 @@ var testSeries = []struct {
input: `my_metric{a="\u70ac = torch"} 1 2 3`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", `炬 = torch`),
expectedValues: newSeq(1, 2, 3),
}, {
input: `my_metric{a="b"} 1 2 3+1x4 # {trace_id="1234"} 4 0 # {trace_id="2345", span_id="1122"} 5 1`,
expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"),
expectedValues: newSeq(1, 2, 3, 4, 5, 6, 7),
expectedExemplars: []Exemplar{
{Labels: labels.FromStrings("trace_id", "1234"), Value: 4, Sequence: 0},
{Labels: labels.FromStrings("trace_id", "2345", "span_id", "1122"), Value: 5, Sequence: 1},
},
}, {
input: `my_metric{a="b"} -3-3 -3`,
fail: true,
@ -4462,7 +4471,7 @@ func TestParseHistogramSeries(t *testing.T) {
{
name: "space after {{",
input: `{} {{ schema:1}}`,
expectedError: `1:7: parse error: unexpected "<Item 57372>" "schema" in series values`,
expectedError: `1:7: parse error: unexpected "<Item 57373>" "schema" in series values`,
},
{
name: "invalid counter reset hint value",
@ -4499,7 +4508,7 @@ func TestParseHistogramSeries(t *testing.T) {
},
} {
t.Run(test.name, func(t *testing.T) {
_, vals, err := ParseSeriesDesc(test.input)
_, vals, _, err := ParseSeriesDesc(test.input)
if test.expectedError != "" {
require.EqualError(t, err, test.expectedError)
return
@ -4571,7 +4580,7 @@ func TestHistogramTestExpression(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
expression := test.input.TestExpression()
require.Equal(t, test.expected, expression)
_, vals, err := ParseSeriesDesc("{} " + expression)
_, vals, _, err := ParseSeriesDesc("{} " + expression)
require.NoError(t, err)
require.Len(t, vals, 1)
canonical := vals[0].Histogram
@ -4583,15 +4592,20 @@ func TestHistogramTestExpression(t *testing.T) {
func TestParseSeries(t *testing.T) {
for _, test := range testSeries {
metric, vals, err := ParseSeriesDesc(test.input)
metric, vals, exemplars, err := ParseSeriesDesc(test.input)
// Unexpected errors are always caused by a bug.
require.NotEqual(t, err, errUnexpected, "unexpected error occurred")
if !test.fail {
require.NoError(t, err)
testutil.RequireEqual(t, test.expectedMetric, metric, "error on input '%s'", test.input)
testutil.RequireEqual(t, test.expectedMetric, metric, "error in input '%s'", test.input)
require.Equal(t, test.expectedValues, vals, "error in input '%s'", test.input)
if test.expectedExemplars == nil {
require.Empty(t, exemplars, "error in input '%s'", test.input)
} else {
require.Equal(t, test.expectedExemplars, exemplars, "error in input '%s'", test.input)
}
} else {
require.Error(t, err)
}

View file

@ -243,24 +243,24 @@ func parseLoad(lines []string, i int) (int, *loadCmd, error) {
i--
break
}
metric, vals, err := parseSeries(defLine, i)
metric, vals, exemplars, err := parseSeries(defLine, i)
if err != nil {
return i, nil, err
}
cmd.set(metric, vals...)
cmd.set(metric, vals, exemplars)
}
return i, cmd, nil
}
func parseSeries(defLine string, line int) (labels.Labels, []parser.SequenceValue, error) {
metric, vals, err := parser.ParseSeriesDesc(defLine)
func parseSeries(defLine string, line int) (labels.Labels, []parser.SequenceValue, []parser.Exemplar, error) {
metric, vals, exemplars, err := parser.ParseSeriesDesc(defLine)
if err != nil {
parser.EnrichParseError(err, func(parseErr *parser.ParseErr) {
parseErr.LineOffset = line
})
return labels.Labels{}, nil, err
return labels.Labels{}, nil, nil, err
}
return metric, vals, nil
return metric, vals, exemplars, nil
}
func (t *test) parseEval(lines []string, i int) (int, *evalCmd, error) {
@ -379,7 +379,7 @@ func (t *test) parseEval(lines []string, i int) (int, *evalCmd, error) {
cmd.expect(0, parser.SequenceValue{Value: f})
break
}
metric, vals, err := parseSeries(defLine, i)
metric, vals, _, err := parseSeries(defLine, i)
if err != nil {
return i, nil, err
}
@ -471,7 +471,7 @@ func (cmd loadCmd) String() string {
}
// set a sequence of sample values for the given metric.
func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) {
func (cmd *loadCmd) set(m labels.Labels, vals []parser.SequenceValue, ex []parser.Exemplar) {
h := m.Hash()
samples := make([]promql.Sample, 0, len(vals))
@ -486,12 +486,25 @@ func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) {
}
ts = ts.Add(cmd.gap)
}
exemplars := make([]exemplar.Exemplar, 0, len(ex))
for _, e := range ex {
ts = testStartTime.Add(time.Millisecond * time.Duration(cmd.gap.Milliseconds()*int64(e.Sequence)))
exemplars = append(exemplars, exemplar.Exemplar{
Labels: e.Labels,
Value: e.Value,
Ts: ts.UnixNano() / int64(time.Millisecond/time.Nanosecond),
HasTs: true,
})
}
cmd.defs[h] = samples
cmd.metrics[h] = m
cmd.exemplars[h] = exemplars
}
// append the defined time series to the storage.
func (cmd *loadCmd) append(a storage.Appender) error {
func (cmd *loadCmd) append(a storage.Appender, ea storage.ExemplarAppender) error {
for h, smpls := range cmd.defs {
m := cmd.metrics[h]
@ -500,6 +513,14 @@ func (cmd *loadCmd) append(a storage.Appender) error {
return err
}
}
if expls, ok := cmd.exemplars[h]; ok {
for _, e := range expls {
if err := appendExemplar(ea, e, m); err != nil {
return err
}
}
}
}
if cmd.withNHCB {
return cmd.appendCustomHistogram(a)
@ -618,6 +639,13 @@ func appendSample(a storage.Appender, s promql.Sample, m labels.Labels) error {
return nil
}
func appendExemplar(a storage.ExemplarAppender, e exemplar.Exemplar, m labels.Labels) error {
if _, err := a.AppendExemplar(0, m, e); err != nil {
return err
}
return nil
}
// evalCmd is a command that evaluates an expression for the given time (range)
// and expects a specific result.
type evalCmd struct {
@ -1086,7 +1114,7 @@ func (t *test) exec(tc testCommand, engine promql.QueryEngine) error {
case *loadCmd:
app := t.storage.Appender(t.context)
if err := cmd.append(app); err != nil {
if err := cmd.append(app, t.storage.(*teststorage.TestStorage).ExemplarAppender()); err != nil {
app.Rollback()
return err
}

View file

@ -23,6 +23,8 @@ import (
"sync"
"time"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
@ -496,6 +498,18 @@ func (r *AlertingRule) Eval(ctx context.Context, queryOffset time.Duration, ts t
return vec, nil
}
// EvalWithExemplars is currently a pass through to Eval.
func (r *AlertingRule) EvalWithExemplars(ctx context.Context, queryOffset time.Duration, evaluationTime time.Time, queryFunc QueryFunc,
_ ExemplarQueryFunc, externalURL *url.URL, limit int,
) (promql.Vector, []exemplar.QueryResult, error) {
// TODO: Right now EvalWithExemplars doesn't emit exemplars given the fact that alert state label isn't as straight forward to handle.
vector, err := r.Eval(ctx, queryOffset, evaluationTime, queryFunc, externalURL, limit)
if err != nil {
return nil, nil, err
}
return vector, nil, nil
}
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive.
func (r *AlertingRule) State() AlertState {

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
@ -537,7 +538,17 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
var (
vector promql.Vector
eqr []exemplar.QueryResult
err error
)
if g.opts.ExemplarQueryFunc != nil {
vector, eqr, err = rule.EvalWithExemplars(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExemplarQueryFunc, g.opts.ExternalURL, g.Limit())
} else {
vector, err = rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
}
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
@ -624,6 +635,17 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
logger.Warn("Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates)
}
// For each series, append exemplars. Reuse the ref for faster appends.
for _, eq := range eqr {
var ref storage.SeriesRef
for _, e := range eq.Exemplars {
ref, err = app.AppendExemplar(ref, eq.SeriesLabels, e)
if err != nil {
logger.Warn("Rule evaluation exemplar discarded", "err", err, "exemplar", e)
}
}
}
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.

View file

@ -24,6 +24,9 @@ import (
"sync"
"time"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promslog"
"golang.org/x/sync/semaphore"
@ -88,6 +91,31 @@ func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.
g.setLastEvalTimestamp(evalTimestamp)
}
// ExemplarQueryFunc extracts exemplars for a given expression and time range.
type ExemplarQueryFunc func(ctx context.Context, nameSelectors [][]*labels.Matcher, ts time.Time, interval time.Duration) ([]exemplar.QueryResult, error)
// ExemplarQuerierQueryFunc creates a new ExemplarQueryFunc from given exemplar storage.
func ExemplarQuerierQueryFunc(q storage.ExemplarQueryable) ExemplarQueryFunc {
return func(ctx context.Context, nameSelectors [][]*labels.Matcher, ts time.Time, interval time.Duration) ([]exemplar.QueryResult, error) {
if len(nameSelectors) < 1 {
return nil, nil
}
eq, err := q.ExemplarQuerier(ctx)
if err != nil {
return nil, err
}
// Query all the raw exemplars that match the named selectors.
ex, err := eq.Select(timestamp.FromTime(ts.Add(-interval)), timestamp.FromTime(ts), nameSelectors...)
if err != nil {
return nil, err
}
return ex, nil
}
}
// The Manager manages recording and alerting rules.
type Manager struct {
opts *ManagerOptions
@ -107,6 +135,7 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
type ManagerOptions struct {
ExternalURL *url.URL
QueryFunc QueryFunc
ExemplarQueryFunc ExemplarQueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable storage.Appendable

View file

@ -19,6 +19,8 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/stretchr/testify/require"
"github.com/prometheus/common/promslog"
@ -35,6 +37,12 @@ func (u unknownRule) Labels() labels.Labels { return labels.EmptyLabels() }
func (u unknownRule) Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) {
return nil, nil
}
func (u unknownRule) EvalWithExemplars(context.Context, time.Duration, time.Time, QueryFunc,
ExemplarQueryFunc, *url.URL, int,
) (promql.Vector, []exemplar.QueryResult, error) {
return nil, nil, nil
}
func (u unknownRule) String() string { return "" }
func (u unknownRule) Query() parser.Expr { return nil }
func (u unknownRule) SetLastError(error) {}

View file

@ -18,8 +18,11 @@ import (
"errors"
"fmt"
"net/url"
"sync"
"time"
"github.com/prometheus/prometheus/model/exemplar"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
@ -31,9 +34,10 @@ import (
// A RecordingRule records its vector expression into new timeseries.
type RecordingRule struct {
name string
vector parser.Expr
labels labels.Labels
name string
vector parser.Expr
nameSelectors [][]*labels.Matcher
labels labels.Labels
// The health of the recording rule.
health *atomic.String
// Timestamp of last evaluation of the recording rule.
@ -45,21 +49,34 @@ type RecordingRule struct {
noDependentRules *atomic.Bool
noDependencyRules *atomic.Bool
// exemplarsCacheMtx Protects the `matchedExemplarHashes` map.
exemplarsCacheMtx sync.Mutex
// A set of combination of time series label hashes and exemplar label hashes that have already been matched successfully.
matchedExemplarHashes map[*hashTuple]struct{}
}
type hashTuple struct {
seriesHash uint64
exemplarHash uint64
}
// NewRecordingRule returns a new recording rule.
func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *RecordingRule {
return &RecordingRule{
name: name,
vector: vector,
labels: lset,
health: atomic.NewString(string(HealthUnknown)),
evaluationTimestamp: atomic.NewTime(time.Time{}),
evaluationDuration: atomic.NewDuration(0),
lastError: atomic.NewError(nil),
noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
result := &RecordingRule{
name: name,
vector: vector,
labels: lset,
health: atomic.NewString(string(HealthUnknown)),
evaluationTimestamp: atomic.NewTime(time.Time{}),
evaluationDuration: atomic.NewDuration(0),
lastError: atomic.NewError(nil),
noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
matchedExemplarHashes: map[*hashTuple]struct{}{},
}
result.loadNameSelectors()
return result
}
// Name returns the rule name.
@ -77,6 +94,100 @@ func (rule *RecordingRule) Labels() labels.Labels {
return rule.labels
}
// EvalWithExemplars will include exemplars in the Eval results. This is accomplished by matching the underlying
// exemplars of the referenced time series against the result labels of the recording rule, before renaming labels
// according to the rule's configuration.
// Note that this feature won't be able to match exemplars for certain label renaming scenarios, such as
// 1. When the recording rule renames labels via label_replace or label_join functions.
// 2. When the recording rule in turn references other recording rules that have renamed their labels as part of
// their configuration.
func (rule *RecordingRule) EvalWithExemplars(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc,
exemplarQuery ExemplarQueryFunc, _ *url.URL, limit int,
) (promql.Vector, []exemplar.QueryResult, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(rule))
vector, err := query(ctx, rule.vector.String(), ts.Add(-queryOffset))
if err != nil {
return nil, nil, err
}
var resultExemplars []exemplar.QueryResult
if len(vector) < 1 {
return vector, nil, nil
}
// Query all the raw exemplars that match the query. Exemplars "match" the query if and only if they
// satisfy the query's selectors, i.e., belong to the same referenced time series and interval.
exemplars, err := exemplarQuery(ctx, rule.nameSelectors, ts, queryOffset)
if err != nil {
return nil, nil, err
}
if len(exemplars) > 0 {
// Loop through each of the new series and try matching the new series labels against the exemplar series labels.
// If they match, replace the exemplar series labels with the incoming series labels. This is to ensure that the
// exemplars are stored against the refID of the new series. If there is no match then drop the exemplar.
// There will only ever be one exemplar per sample; in case of multiple matches, the winner is determined
// according to the order of the exemplar query result.
for _, sample := range vector {
sampleHash := sample.Metric.Hash()
sampleMatchers := make([]*labels.Matcher, 0, sample.Metric.Len())
for _, ex := range exemplars {
exemplarHash := ex.SeriesLabels.Hash()
if rule.hasAlreadyBeenMatchedSuccessfully(sampleHash, exemplarHash) {
ex.SeriesLabels = sample.Metric
resultExemplars = append(resultExemplars, ex)
break
}
if len(sampleMatchers) == 0 {
sample.Metric.Range(func(l labels.Label) {
sampleMatchers = append(sampleMatchers, labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value))
})
}
if ok := matches(ex.SeriesLabels, sampleMatchers...); ok {
rule.recordMatch(sampleHash, exemplarHash)
ex.SeriesLabels = sample.Metric
resultExemplars = append(resultExemplars, ex)
break
}
}
}
}
if err = rule.applyVectorLabels(vector); err != nil {
return nil, nil, err
}
if err = rule.limit(vector, limit); err != nil {
return nil, nil, err
}
if len(resultExemplars) > 0 {
rule.applyExemplarLabels(resultExemplars)
}
rule.SetHealth(HealthGood)
rule.SetLastError(err)
return vector, resultExemplars, nil
}
// applyExemplarLabels applies labels from the rule and sets the metric name for the exemplar result.
func (rule *RecordingRule) applyExemplarLabels(ex []exemplar.QueryResult) {
// Override the metric name and labels.
lb := labels.NewBuilder(labels.EmptyLabels())
for i := range ex {
e := &ex[i]
e.SeriesLabels = rule.applyLabels(lb, e.SeriesLabels)
}
}
func (rule *RecordingRule) applyLabels(lb *labels.Builder, ls labels.Labels) labels.Labels {
lb.Reset(ls)
lb.Set(labels.MetricName, rule.name)
rule.labels.Range(func(l labels.Label) {
lb.Set(l.Name, l.Value)
})
return lb.Labels()
}
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(rule))
@ -85,6 +196,22 @@ func (rule *RecordingRule) Eval(ctx context.Context, queryOffset time.Duration,
return nil, err
}
// Override the metric name and labels.
if err = rule.applyVectorLabels(vector); err != nil {
return nil, err
}
if err = rule.limit(vector, limit); err != nil {
return nil, err
}
rule.SetHealth(HealthGood)
rule.SetLastError(err)
return vector, nil
}
// applyVectorLabels applies labels from the rule and sets the metric name for the vector.
func (rule *RecordingRule) applyVectorLabels(vector promql.Vector) error {
// Override the metric name and labels.
lb := labels.NewBuilder(labels.EmptyLabels())
@ -104,17 +231,55 @@ func (rule *RecordingRule) Eval(ctx context.Context, queryOffset time.Duration,
// Check that the rule does not produce identical metrics after applying
// labels.
if vector.ContainsSameLabelset() {
return nil, errors.New("vector contains metrics with the same labelset after applying rule labels")
return errors.New("vector contains metrics with the same labelset after applying rule labels")
}
return nil
}
// limit ensures that any limits being set on the rules for series limit are enforced.
func (*RecordingRule) limit(vector promql.Vector, limit int) error {
numSeries := len(vector)
if limit > 0 && numSeries > limit {
return nil, fmt.Errorf("exceeded limit of %d with %d series", limit, numSeries)
return fmt.Errorf("exceeded limit of %d with %d series", limit, numSeries)
}
rule.SetHealth(HealthGood)
rule.SetLastError(err)
return vector, nil
return nil
}
func (rule *RecordingRule) loadNameSelectors() {
if rule.vector == nil {
rule.nameSelectors = [][]*labels.Matcher{}
return
}
selectors := parser.ExtractSelectors(rule.vector)
// Only include selectors with __name__ matcher. Otherwise, we would potentially select exemplars across
// all time series.
nameSelectors := make([][]*labels.Matcher, 0, len(selectors))
for _, selector := range selectors {
for _, matcher := range selector {
if matcher.Name == labels.MetricName {
nameSelectors = append(nameSelectors, selector)
break
}
}
}
rule.nameSelectors = nameSelectors
}
func (rule *RecordingRule) hasAlreadyBeenMatchedSuccessfully(seriesHash, exemplarHash uint64) bool {
rule.exemplarsCacheMtx.Lock()
defer rule.exemplarsCacheMtx.Unlock()
_, ok := rule.matchedExemplarHashes[&hashTuple{seriesHash: seriesHash, exemplarHash: exemplarHash}]
return ok
}
func (rule *RecordingRule) recordMatch(seriesHash, exemplarHash uint64) {
rule.exemplarsCacheMtx.Lock()
defer rule.exemplarsCacheMtx.Unlock()
rule.matchedExemplarHashes[&hashTuple{seriesHash: seriesHash, exemplarHash: exemplarHash}] = struct{}{}
}
func (rule *RecordingRule) String() string {

View file

@ -119,6 +119,14 @@ func setUpRuleEvalTest(t require.TestingT) *teststorage.TestStorage {
`)
}
func setUpRuleExemplarsEvalTest(t require.TestingT) *teststorage.TestStorage {
return promqltest.LoadedStorage(t, `
load 1m
metric{label_a="1",label_b="3"} 1 # {trace_id="1234"} 1 0
metric{label_a="2",label_b="4"} 10 # {trace_id="2345"} 10 0
`)
}
func TestRuleEval(t *testing.T) {
storage := setUpRuleEvalTest(t)
t.Cleanup(func() { storage.Close() })
@ -134,6 +142,128 @@ func TestRuleEval(t *testing.T) {
}
}
func TestRuleEvalWithZeroExemplars(t *testing.T) {
storage := setUpRuleEvalTest(t)
t.Cleanup(func() { storage.Close() })
ng := testEngine(t)
for _, scenario := range ruleEvalTestScenarios {
t.Run(scenario.name, func(t *testing.T) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
result, _, err := rule.EvalWithExemplars(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage),
ExemplarQuerierQueryFunc(storage.ExemplarQueryable()), nil, 0)
require.NoError(t, err)
testutil.RequireEqual(t, scenario.expected, result)
})
}
}
func TestRuleEvalWithExemplars(t *testing.T) {
storage := setUpRuleExemplarsEvalTest(t)
t.Cleanup(func() { storage.Close() })
ng := testEngine(t)
interval := 60 * time.Second
for _, scenario := range ruleEvalTestScenarios {
t.Run(scenario.name, func(t *testing.T) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
result, ex, err := rule.EvalWithExemplars(context.TODO(), interval, ruleEvaluationTime.Add(interval), EngineQueryFunc(ng, storage),
ExemplarQuerierQueryFunc(storage.ExemplarQueryable()), nil, 0)
require.NoError(t, err)
require.Len(t, ex, 2)
testutil.RequireEqual(t, scenario.expected, result)
})
}
}
func TestComplexRuleEvalWithExemplars(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
load 1m
requests{label_a="1",label_b="3"} 1 3 5 7 8 # {trace_id="1234"} 1 2
requests{label_a="2",label_b="4"} 10 12 14 16 17 # {trace_id="2345"} 10 2
request_failures{label_a="1",label_b="3"} 1 2 2 3 3
request_failures{label_a="3",label_b="4"} 1 2 2 3 3 # {trace_id="2345"} 1 2
`)
t.Cleanup(func() { storage.Close() })
ng := testEngine(t)
interval := time.Second * 60
expr, _ := parser.ParseExpr(`
sort(
sum without (label_a) (rate(request_failures[2m]))
/
sum without (label_a) (rate(requests[2m]))
)`)
rule := NewRecordingRule("test_rule", expr, labels.FromStrings("label_a", "from_rule"))
result, ex, err := rule.EvalWithExemplars(context.TODO(), interval, ruleEvaluationTime.Add(2*interval), EngineQueryFunc(ng, storage),
ExemplarQuerierQueryFunc(storage.ExemplarQueryable()), nil, 0)
require.NoError(t, err)
require.Len(t, ex, 2)
expected := promql.Vector{
promql.Sample{
Metric: labels.FromStrings("__name__", "test_rule", "label_a", "from_rule", "label_b", "4"),
F: .5,
T: timestamp.FromTime(ruleEvaluationTime.Add(1 * time.Minute)),
},
promql.Sample{
Metric: labels.FromStrings("__name__", "test_rule", "label_a", "from_rule", "label_b", "3"),
F: .6666666666666666,
T: timestamp.FromTime(ruleEvaluationTime.Add(1 * time.Minute)),
},
}
testutil.RequireEqual(t, expected, result)
}
func BenchmarkRuleEvalWithNoExemplars(b *testing.B) {
storage := setUpRuleEvalTest(b)
b.Cleanup(func() { storage.Close() })
ng := testEngine(b)
interval := 60 * time.Second
for _, scenario := range ruleEvalTestScenarios {
b.Run(scenario.name, func(b *testing.B) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := rule.EvalWithExemplars(context.TODO(), interval, ruleEvaluationTime.Add(interval), EngineQueryFunc(ng, storage),
ExemplarQuerierQueryFunc(storage.ExemplarQueryable()), nil, 0)
if err != nil {
require.NoError(b, err)
}
}
})
}
}
func BenchmarkRuleEvalWithExemplars(b *testing.B) {
storage := setUpRuleExemplarsEvalTest(b)
b.Cleanup(func() { storage.Close() })
ng := testEngine(b)
interval := 60 * time.Second
for _, scenario := range ruleEvalTestScenarios {
b.Run(scenario.name, func(b *testing.B) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := rule.EvalWithExemplars(context.TODO(), interval, ruleEvaluationTime.Add(interval), EngineQueryFunc(ng, storage),
ExemplarQuerierQueryFunc(storage.ExemplarQueryable()), nil, 0)
if err != nil {
require.NoError(b, err)
}
}
})
}
}
func BenchmarkRuleEval(b *testing.B) {
storage := setUpRuleEvalTest(b)
b.Cleanup(func() { storage.Close() })

View file

@ -18,6 +18,8 @@ import (
"net/url"
"time"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
@ -41,6 +43,9 @@ type Rule interface {
Labels() labels.Labels
// Eval evaluates the rule, including any associated recording or alerting actions.
Eval(ctx context.Context, queryOffset time.Duration, evaluationTime time.Time, queryFunc QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error)
// EvalWithExemplars evaluates the rule, including any associated recording or alerting actions, and matching exemplars.
EvalWithExemplars(ctx context.Context, queryOffset time.Duration, evaluationTime time.Time, queryFunc QueryFunc,
exemplarQueryFunc ExemplarQueryFunc, externalURL *url.URL, limit int) (promql.Vector, []exemplar.QueryResult, error)
// String returns a human-readable string representation of the rule.
String() string
// Query returns the rule query expression.