prometheus/promql/info.go
Arve Knudsen de16f5e387
[FEATURE] PromQL: Add experimental info function MVP (#14495)
The `info` function is an experiment to improve UX
around including labels from info metrics.
`info` has to be enabled via the feature flag `--enable-feature=promql-experimental-functions`.

This MVP of info simplifies the implementation by assuming:
* Only support for the target_info metric
* That target_info's identifying labels are job and instance

Also:
* Encode info samples' original timestamp as sample value
* Deduce info series select hints from top-most VectorSelector

---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
Co-authored-by: Ying WANG <ying.wang@grafana.com>
Co-authored-by: Augustin Husson <augustin.husson@amadeus.com>
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Co-authored-by: Björn Rabenstein <github@rabenste.in>
Co-authored-by: Bryan Boreham <bjboreham@gmail.com>
2024-10-16 13:52:11 +01:00

455 lines
14 KiB
Go

// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promql
import (
"context"
"errors"
"fmt"
"slices"
"strings"
"github.com/grafana/regexp"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
)
const targetInfo = "target_info"
// identifyingLabels are the labels we consider as identifying for info metrics.
// Currently hard coded, so we don't need knowledge of individual info metrics.
var identifyingLabels = []string{"instance", "job"}
// evalInfo implements the info PromQL function.
func (ev *evaluator) evalInfo(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) {
val, annots := ev.eval(ctx, args[0])
mat := val.(Matrix)
// Map from data label name to matchers.
dataLabelMatchers := map[string][]*labels.Matcher{}
var infoNameMatchers []*labels.Matcher
if len(args) > 1 {
// TODO: Introduce a dedicated LabelSelector type.
labelSelector := args[1].(*parser.VectorSelector)
for _, m := range labelSelector.LabelMatchers {
dataLabelMatchers[m.Name] = append(dataLabelMatchers[m.Name], m)
if m.Name == labels.MetricName {
infoNameMatchers = append(infoNameMatchers, m)
}
}
} else {
infoNameMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, targetInfo)}
}
// Don't try to enrich info series.
ignoreSeries := map[int]struct{}{}
loop:
for i, s := range mat {
name := s.Metric.Get(labels.MetricName)
for _, m := range infoNameMatchers {
if m.Matches(name) {
ignoreSeries[i] = struct{}{}
continue loop
}
}
}
selectHints := ev.infoSelectHints(args[0])
infoSeries, ws, err := ev.fetchInfoSeries(ctx, mat, ignoreSeries, dataLabelMatchers, selectHints)
if err != nil {
ev.error(err)
}
annots.Merge(ws)
res, ws := ev.combineWithInfoSeries(ctx, mat, infoSeries, ignoreSeries, dataLabelMatchers)
annots.Merge(ws)
return res, annots
}
// infoSelectHints calculates the storage.SelectHints for selecting info series, given expr (first argument to info call).
func (ev *evaluator) infoSelectHints(expr parser.Expr) storage.SelectHints {
var nodeTimestamp *int64
var offset int64
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
if n.Timestamp != nil {
nodeTimestamp = n.Timestamp
}
offset = durationMilliseconds(n.OriginalOffset)
return fmt.Errorf("end traversal")
default:
return nil
}
})
start := ev.startTimestamp
end := ev.endTimestamp
if nodeTimestamp != nil {
// The timestamp on the selector overrides everything.
start = *nodeTimestamp
end = *nodeTimestamp
}
// Reduce the start by one fewer ms than the lookback delta
// because wo want to exclude samples that are precisely the
// lookback delta before the eval time.
start -= durationMilliseconds(ev.lookbackDelta) - 1
start -= offset
end -= offset
return storage.SelectHints{
Start: start,
End: end,
Step: ev.interval,
Func: "info",
}
}
// fetchInfoSeries fetches info series given matching identifying labels in mat.
// Series in ignoreSeries are not fetched.
// dataLabelMatchers may be mutated.
func (ev *evaluator) fetchInfoSeries(ctx context.Context, mat Matrix, ignoreSeries map[int]struct{}, dataLabelMatchers map[string][]*labels.Matcher, selectHints storage.SelectHints) (Matrix, annotations.Annotations, error) {
// A map of values for all identifying labels we are interested in.
idLblValues := map[string]map[string]struct{}{}
for i, s := range mat {
if _, exists := ignoreSeries[i]; exists {
continue
}
// Register relevant values per identifying label for this series.
for _, l := range identifyingLabels {
val := s.Metric.Get(l)
if val == "" {
continue
}
if idLblValues[l] == nil {
idLblValues[l] = map[string]struct{}{}
}
idLblValues[l][val] = struct{}{}
}
}
if len(idLblValues) == 0 {
return nil, nil, nil
}
// Generate regexps for every interesting value per identifying label.
var sb strings.Builder
idLblRegexps := make(map[string]string, len(idLblValues))
for name, vals := range idLblValues {
sb.Reset()
i := 0
for v := range vals {
if i > 0 {
sb.WriteRune('|')
}
sb.WriteString(regexp.QuoteMeta(v))
i++
}
idLblRegexps[name] = sb.String()
}
var infoLabelMatchers []*labels.Matcher
for name, re := range idLblRegexps {
infoLabelMatchers = append(infoLabelMatchers, labels.MustNewMatcher(labels.MatchRegexp, name, re))
}
var nameMatcher *labels.Matcher
for name, ms := range dataLabelMatchers {
for i, m := range ms {
if m.Name == labels.MetricName {
nameMatcher = m
ms = slices.Delete(ms, i, i+1)
}
infoLabelMatchers = append(infoLabelMatchers, m)
}
if len(ms) > 0 {
dataLabelMatchers[name] = ms
} else {
delete(dataLabelMatchers, name)
}
}
if nameMatcher == nil {
// Default to using the target_info metric.
infoLabelMatchers = append([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, targetInfo)}, infoLabelMatchers...)
}
infoIt := ev.querier.Select(ctx, false, &selectHints, infoLabelMatchers...)
infoSeries, ws, err := expandSeriesSet(ctx, infoIt)
if err != nil {
return nil, ws, err
}
infoMat := ev.evalSeries(ctx, infoSeries, 0, true)
return infoMat, ws, nil
}
// combineWithInfoSeries combines mat with select data labels from infoMat.
func (ev *evaluator) combineWithInfoSeries(ctx context.Context, mat, infoMat Matrix, ignoreSeries map[int]struct{}, dataLabelMatchers map[string][]*labels.Matcher) (Matrix, annotations.Annotations) {
buf := make([]byte, 0, 1024)
lb := labels.NewScratchBuilder(0)
sigFunction := func(name string) func(labels.Labels) string {
return func(lset labels.Labels) string {
lb.Reset()
lb.Add(labels.MetricName, name)
lset.MatchLabels(true, identifyingLabels...).Range(func(l labels.Label) {
lb.Add(l.Name, l.Value)
})
lb.Sort()
return string(lb.Labels().Bytes(buf))
}
}
infoMetrics := map[string]struct{}{}
for _, is := range infoMat {
lblMap := is.Metric.Map()
infoMetrics[lblMap[labels.MetricName]] = struct{}{}
}
sigfs := make(map[string]func(labels.Labels) string, len(infoMetrics))
for name := range infoMetrics {
sigfs[name] = sigFunction(name)
}
// Keep a copy of the original point slices so they can be returned to the pool.
origMatrices := []Matrix{
make(Matrix, len(mat)),
make(Matrix, len(infoMat)),
}
copy(origMatrices[0], mat)
copy(origMatrices[1], infoMat)
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
originalNumSamples := ev.currentSamples
// Create an output vector that is as big as the input matrix with
// the most time series.
biggestLen := max(len(mat), len(infoMat))
baseVector := make(Vector, 0, len(mat))
infoVector := make(Vector, 0, len(infoMat))
enh := &EvalNodeHelper{
Out: make(Vector, 0, biggestLen),
}
type seriesAndTimestamp struct {
Series
ts int64
}
seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
// For every base series, compute signature per info metric.
baseSigs := make([]map[string]string, 0, len(mat))
for _, s := range mat {
sigs := make(map[string]string, len(infoMetrics))
for infoName := range infoMetrics {
sigs[infoName] = sigfs[infoName](s.Metric)
}
baseSigs = append(baseSigs, sigs)
}
infoSigs := make([]string, 0, len(infoMat))
for _, s := range infoMat {
name := s.Metric.Map()[labels.MetricName]
infoSigs = append(infoSigs, sigfs[name](s.Metric))
}
var warnings annotations.Annotations
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
// Reset number of samples in memory after each timestamp.
ev.currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
baseVector, _ = ev.gatherVector(ts, mat, baseVector, nil, nil)
infoVector, _ = ev.gatherVector(ts, infoMat, infoVector, nil, nil)
enh.Ts = ts
result, err := ev.combineWithInfoVector(baseVector, infoVector, ignoreSeries, baseSigs, infoSigs, enh, dataLabelMatchers)
if err != nil {
ev.error(err)
}
enh.Out = result[:0] // Reuse result vector.
vecNumSamples := result.TotalSamples()
ev.currentSamples += vecNumSamples
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory.
tempNumSamples += vecNumSamples
ev.samplesStats.UpdatePeak(ev.currentSamples)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
// Add samples in result vector to output series.
for _, sample := range result {
h := sample.Metric.Hash()
ss, exists := seriess[h]
if exists {
if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
ev.errorf("vector cannot contain metrics with the same labelset")
}
ss.ts = ts
} else {
ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
}
addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps)
seriess[h] = ss
}
}
// Reuse the original point slices.
for _, m := range origMatrices {
for _, s := range m {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
numSamples := 0
output := make(Matrix, 0, len(seriess))
for _, ss := range seriess {
numSamples += len(ss.Floats) + totalHPointSize(ss.Histograms)
output = append(output, ss.Series)
}
ev.currentSamples = originalNumSamples + numSamples
ev.samplesStats.UpdatePeak(ev.currentSamples)
return output, warnings
}
// combineWithInfoVector combines base and info Vectors.
// Base series in ignoreSeries are not combined.
func (ev *evaluator) combineWithInfoVector(base, info Vector, ignoreSeries map[int]struct{}, baseSigs []map[string]string, infoSigs []string, enh *EvalNodeHelper, dataLabelMatchers map[string][]*labels.Matcher) (Vector, error) {
if len(base) == 0 {
return nil, nil // Short-circuit: nothing is going to match.
}
// All samples from the info Vector hashed by the matching label/values.
if enh.rightSigs == nil {
enh.rightSigs = make(map[string]Sample, len(enh.Out))
} else {
clear(enh.rightSigs)
}
for i, s := range info {
if s.H != nil {
ev.error(errors.New("info sample should be float"))
}
// We encode original info sample timestamps via the float value.
origT := int64(s.F)
sig := infoSigs[i]
if existing, exists := enh.rightSigs[sig]; exists {
// We encode original info sample timestamps via the float value.
existingOrigT := int64(existing.F)
switch {
case existingOrigT > origT:
// Keep the other info sample, since it's newer.
case existingOrigT < origT:
// Keep this info sample, since it's newer.
enh.rightSigs[sig] = s
default:
// The two info samples have the same timestamp - conflict.
name := s.Metric.Map()[labels.MetricName]
ev.errorf("found duplicate series for info metric %s", name)
}
} else {
enh.rightSigs[sig] = s
}
}
for i, bs := range base {
if _, exists := ignoreSeries[i]; exists {
// This series should not be enriched with info metric data labels.
enh.Out = append(enh.Out, Sample{
Metric: bs.Metric,
F: bs.F,
H: bs.H,
})
continue
}
baseLabels := bs.Metric.Map()
enh.resetBuilder(labels.Labels{})
// For every info metric name, try to find an info series with the same signature.
seenInfoMetrics := map[string]struct{}{}
for infoName, sig := range baseSigs[i] {
is, exists := enh.rightSigs[sig]
if !exists {
continue
}
if _, exists := seenInfoMetrics[infoName]; exists {
continue
}
err := is.Metric.Validate(func(l labels.Label) error {
if l.Name == labels.MetricName {
return nil
}
if _, exists := dataLabelMatchers[l.Name]; len(dataLabelMatchers) > 0 && !exists {
// Not among the specified data label matchers.
return nil
}
if v := enh.lb.Get(l.Name); v != "" && v != l.Value {
return fmt.Errorf("conflicting label: %s", l.Name)
}
if _, exists := baseLabels[l.Name]; exists {
// Skip labels already on the base metric.
return nil
}
enh.lb.Set(l.Name, l.Value)
return nil
})
if err != nil {
return nil, err
}
seenInfoMetrics[infoName] = struct{}{}
}
infoLbls := enh.lb.Labels()
if infoLbls.Len() == 0 {
// If there's at least one data label matcher not matching the empty string,
// we have to ignore this series as there are no matching info series.
allMatchersMatchEmpty := true
for _, ms := range dataLabelMatchers {
for _, m := range ms {
if !m.Matches("") {
allMatchersMatchEmpty = false
break
}
}
}
if !allMatchersMatchEmpty {
continue
}
}
enh.resetBuilder(bs.Metric)
infoLbls.Range(func(l labels.Label) {
enh.lb.Set(l.Name, l.Value)
})
enh.Out = append(enh.Out, Sample{
Metric: enh.lb.Labels(),
F: bs.F,
H: bs.H,
})
}
return enh.Out, nil
}