Add present_over_time (#9097)

* Add present_over_time

Signed-off-by: darshanime <deathbullet@gmail.com>

* Add tests for present_over_time

Signed-off-by: darshanime <deathbullet@gmail.com>

* Address PR comments

Signed-off-by: darshanime <deathbullet@gmail.com>

* Add documentation for present_over_time

Signed-off-by: darshanime <deathbullet@gmail.com>

* Update documentation

Signed-off-by: darshanime <deathbullet@gmail.com>

* Update documentation comment

Signed-off-by: darshanime <deathbullet@gmail.com>
This commit is contained in:
Darshan Chaudhary 2021-07-29 16:08:11 +05:30 committed by GitHub
parent e2db44b653
commit c4f2e9eec5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 78 additions and 6 deletions

View file

@ -430,6 +430,7 @@ over time and return an instant vector with per-series aggregation results:
* `stddev_over_time(range-vector)`: the population standard deviation of the values in the specified interval.
* `stdvar_over_time(range-vector)`: the population standard variance of the values in the specified interval.
* `last_over_time(range-vector)`: the most recent point value in specified interval.
* `present_over_time(range-vector)`: the value 1 for any series in the specified interval.
Note that all values in the specified interval have the same weight in the
aggregation even if the values are not equally spaced throughout the interval.

View file

@ -530,8 +530,6 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag
// Cancel when execution is done or an error was raised.
defer q.cancel()
const env = "query execution"
evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime)
defer evalSpanTimer.Finish()

View file

@ -513,6 +513,13 @@ func funcAbsentOverTime(vals []parser.Value, args parser.Expressions, enh *EvalN
})
}
// === present_over_time(Vector parser.ValueTypeMatrix) Vector ===
func funcPresentOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return aggrOverTime(vals, enh, func(values []Point) float64 {
return 1
})
}
func simpleFunc(vals []parser.Value, enh *EvalNodeHelper, f func(float64) float64) Vector {
for _, el := range vals[0].(Vector) {
enh.Out = append(enh.Out, Sample{
@ -959,6 +966,7 @@ var FunctionCalls = map[string]FunctionCall{
"minute": funcMinute,
"month": funcMonth,
"predict_linear": funcPredictLinear,
"present_over_time": funcPresentOverTime,
"quantile_over_time": funcQuantileOverTime,
"rate": funcRate,
"resets": funcResets,

View file

@ -39,6 +39,11 @@ var Functions = map[string]*Function{
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
},
"present_over_time": {
Name: "present_over_time",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
},
"avg_over_time": {
Name: "avg_over_time",
ArgTypes: []ValueType{ValueTypeMatrix},

View file

@ -583,7 +583,7 @@ func (t *Test) exec(tc testCommand) error {
err = cmd.compareResult(vec)
}
if err != nil {
return errors.Wrapf(err, "error in %s %s (line %d) rande mode", cmd, iq.expr, cmd.line)
return errors.Wrapf(err, "error in %s %s (line %d) range mode", cmd, iq.expr, cmd.line)
}
}

View file

@ -901,6 +901,66 @@ eval instant at 10m absent_over_time({job="ingress"}[4m])
clear
# Testdata for present_over_time()
eval instant at 1m present_over_time(http_requests[5m])
eval instant at 1m present_over_time(http_requests{handler="/foo"}[5m])
eval instant at 1m present_over_time(http_requests{handler!="/foo"}[5m])
eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", handler="/foobar"}[5m])
eval instant at 1m present_over_time(rate(nonexistant[5m])[5m:])
eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", instance="127.0.0.1"}[5m])
load 1m
http_requests{path="/foo",instance="127.0.0.1",job="httpd"} 1+1x10
http_requests{path="/bar",instance="127.0.0.1",job="httpd"} 1+1x10
httpd_handshake_failures_total{instance="127.0.0.1",job="node"} 1+1x15
httpd_log_lines_total{instance="127.0.0.1",job="node"} 1
ssl_certificate_expiry_seconds{job="ingress"} NaN NaN NaN NaN NaN
eval instant at 5m present_over_time(http_requests[5m])
{instance="127.0.0.1", job="httpd", path="/bar"} 1
{instance="127.0.0.1", job="httpd", path="/foo"} 1
eval instant at 5m present_over_time(rate(http_requests[5m])[5m:1m])
{instance="127.0.0.1", job="httpd", path="/bar"} 1
{instance="127.0.0.1", job="httpd", path="/foo"} 1
eval instant at 0m present_over_time(httpd_log_lines_total[30s])
{instance="127.0.0.1",job="node"} 1
eval instant at 1m present_over_time(httpd_log_lines_total[30s])
eval instant at 15m present_over_time(http_requests[5m])
{instance="127.0.0.1", job="httpd", path="/bar"} 1
{instance="127.0.0.1", job="httpd", path="/foo"} 1
eval instant at 16m present_over_time(http_requests[5m])
eval instant at 16m present_over_time(http_requests[6m])
{instance="127.0.0.1", job="httpd", path="/bar"} 1
{instance="127.0.0.1", job="httpd", path="/foo"} 1
eval instant at 16m present_over_time(httpd_handshake_failures_total[1m])
{instance="127.0.0.1", job="node"} 1
eval instant at 16m present_over_time({instance="127.0.0.1"}[5m])
{instance="127.0.0.1",job="node"} 1
eval instant at 21m present_over_time({job="grok"}[20m])
eval instant at 30m present_over_time({instance="127.0.0.1"}[5m:5s])
eval instant at 5m present_over_time({job="ingress"}[4m])
{job="ingress"} 1
eval instant at 10m present_over_time({job="ingress"}[4m])
clear
# Testing exp() sqrt() log2() log10() ln()
load 5m
exp_root_log{l="x"} 10

View file

@ -170,8 +170,8 @@ func (m Matrix) Len() int { return len(m) }
func (m Matrix) Less(i, j int) bool { return labels.Compare(m[i].Metric, m[j].Metric) < 0 }
func (m Matrix) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// ContainsSameLabelset checks if a matrix has samples with the same labelset
// Such a behavior is semantically undefined
// ContainsSameLabelset checks if a matrix has samples with the same labelset.
// Such a behavior is semantically undefined.
// https://github.com/prometheus/prometheus/issues/4562
func (m Matrix) ContainsSameLabelset() bool {
l := make(map[uint64]struct{}, len(m))

View file

@ -144,7 +144,7 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
// newAppendID increments the transaction counter and returns a new transaction
// ID. The first ID returned is 1.
// Also returns the low watermark, to keep lock/unlock operations down
// Also returns the low watermark, to keep lock/unlock operations down.
func (i *isolation) newAppendID() (uint64, uint64) {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()