mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Merge branch 'master' of https://github.com/prometheus/prometheus into update_k8s
This commit is contained in:
commit
aa94efe4b5
|
@ -1,5 +1,6 @@
|
|||
# Prometheus [![Build Status](https://travis-ci.org/prometheus/prometheus.svg)][travis]
|
||||
# Prometheus
|
||||
|
||||
[![Build Status](https://travis-ci.org/prometheus/prometheus.svg)][travis]
|
||||
[![CircleCI](https://circleci.com/gh/prometheus/prometheus/tree/master.svg?style=shield)][circleci]
|
||||
[![Docker Repository on Quay](https://quay.io/repository/prometheus/prometheus/status)][quay]
|
||||
[![Docker Pulls](https://img.shields.io/docker/pulls/prom/prometheus.svg?maxAge=604800)][hub]
|
||||
|
|
|
@ -164,6 +164,10 @@ func main() {
|
|||
"Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period).").
|
||||
Hidden().PlaceHolder("<duration>").SetValue(&cfg.tsdb.MaxBlockDuration)
|
||||
|
||||
a.Flag("storage.tsdb.wal-segment-size",
|
||||
"Size at which to split the tsdb WAL segment files (e.g. 100MB)").
|
||||
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.WALSegmentSize)
|
||||
|
||||
a.Flag("storage.tsdb.retention", "How long to retain samples in storage.").
|
||||
Default("15d").SetValue(&cfg.tsdb.Retention)
|
||||
|
||||
|
@ -233,6 +237,7 @@ func main() {
|
|||
}
|
||||
|
||||
promql.LookbackDelta = time.Duration(cfg.lookbackDelta)
|
||||
promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval))
|
||||
|
||||
logger := promlog.New(&cfg.promlogConfig)
|
||||
|
||||
|
@ -559,6 +564,11 @@ func main() {
|
|||
g.Add(
|
||||
func() error {
|
||||
level.Info(logger).Log("msg", "Starting TSDB ...")
|
||||
if cfg.tsdb.WALSegmentSize != 0 {
|
||||
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
|
||||
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
|
||||
}
|
||||
}
|
||||
db, err := tsdb.Open(
|
||||
cfg.localStoragePath,
|
||||
log.With(logger, "component", "tsdb"),
|
||||
|
@ -654,6 +664,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
|
|||
if failed {
|
||||
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
|
||||
}
|
||||
promql.SetDefaultEvaluationInterval(time.Duration(conf.GlobalConfig.EvaluationInterval))
|
||||
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -247,3 +247,36 @@ func TestSendAlerts(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWALSegmentSizeBounds(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
for size, expectedExitStatus := range map[string]int{"9MB": 1, "257MB": 1, "10": 2, "1GB": 1, "12MB": 0} {
|
||||
prom := exec.Command(promPath, "--storage.tsdb.wal-segment-size="+size, "--config.file="+promConfig)
|
||||
err := prom.Start()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
if expectedExitStatus == 0 {
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- prom.Wait() }()
|
||||
select {
|
||||
case err := <-done:
|
||||
t.Errorf("prometheus should be still running: %v", err)
|
||||
case <-time.After(5 * time.Second):
|
||||
prom.Process.Signal(os.Interrupt)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
err = prom.Wait()
|
||||
testutil.NotOk(t, err, "")
|
||||
if exitError, ok := err.(*exec.ExitError); ok {
|
||||
status := exitError.Sys().(syscall.WaitStatus)
|
||||
testutil.Equals(t, expectedExitStatus, status.ExitStatus())
|
||||
} else {
|
||||
t.Errorf("unable to retrieve the exit status for prometheus: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -311,6 +311,12 @@ Outer:
|
|||
})
|
||||
}
|
||||
|
||||
sort.Slice(expSamples, func(i, j int) bool {
|
||||
return labels.Compare(expSamples[i].Labels, expSamples[j].Labels) <= 0
|
||||
})
|
||||
sort.Slice(gotSamples, func(i, j int) bool {
|
||||
return labels.Compare(gotSamples[i].Labels, gotSamples[j].Labels) <= 0
|
||||
})
|
||||
if !reflect.DeepEqual(expSamples, gotSamples) {
|
||||
errs = append(errs, fmt.Errorf(" expr:'%s', time:%s, \n exp:%#v, \n got:%#v", testCase.Expr,
|
||||
testCase.EvalTime.String(), parsedSamplesString(expSamples), parsedSamplesString(gotSamples)))
|
||||
|
|
|
@ -104,7 +104,7 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
return err
|
||||
}
|
||||
if c.Role == "" {
|
||||
return fmt.Errorf("role missing (one of: pod, service, endpoints, node)")
|
||||
return fmt.Errorf("role missing (one of: pod, service, endpoints, node, ingress)")
|
||||
}
|
||||
if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 {
|
||||
return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured")
|
||||
|
@ -137,7 +137,7 @@ func init() {
|
|||
prometheus.MustRegister(eventCount)
|
||||
|
||||
// Initialize metric vectors.
|
||||
for _, role := range []string{"endpoints", "node", "pod", "service"} {
|
||||
for _, role := range []string{"endpoints", "node", "pod", "service", "ingress"} {
|
||||
for _, evt := range []string{"add", "delete", "update"} {
|
||||
eventCount.WithLabelValues(role, evt)
|
||||
}
|
||||
|
|
|
@ -1028,7 +1028,8 @@ Additional labels prefixed with `__meta_` may be available during the
|
|||
relabeling phase. They are set by the service discovery mechanism that provided
|
||||
the target and vary between mechanisms.
|
||||
|
||||
Labels starting with `__` will be removed from the label set after relabeling is completed.
|
||||
Labels starting with `__` will be removed from the label set after target
|
||||
relabeling is completed.
|
||||
|
||||
If a relabeling step needs to store a label value only temporarily (as the
|
||||
input to a subsequent relabeling step), use the `__tmp` label name prefix. This
|
||||
|
|
|
@ -222,8 +222,7 @@ groups:
|
|||
expr: avg(rate(rpc_durations_seconds_count[5m])) by (job, service)
|
||||
```
|
||||
|
||||
To make Prometheus pick up this new rule, add a `rule_files` statement to the
|
||||
`global` configuration section in your `prometheus.yml`. The config should now
|
||||
To make Prometheus pick up this new rule, add a `rule_files` statement in your `prometheus.yml`. The config should now
|
||||
look like this:
|
||||
|
||||
```yaml
|
||||
|
|
|
@ -20,7 +20,7 @@ the respective repository.
|
|||
|
||||
All Prometheus services are available as Docker images on
|
||||
[Quay.io](https://quay.io/repository/prometheus/prometheus) or
|
||||
[Docker Hub[(https://hub.docker.com/u/prom/).
|
||||
[Docker Hub](https://hub.docker.com/u/prom/).
|
||||
|
||||
Running Prometheus on Docker is as simple as `docker run -p 9090:9090
|
||||
prom/prometheus`. This starts Prometheus with a sample
|
||||
|
|
|
@ -170,6 +170,14 @@ The same works for range vectors. This returns the 5-minutes rate that
|
|||
|
||||
rate(http_requests_total[5m] offset 1w)
|
||||
|
||||
## Subquery
|
||||
|
||||
Subquery allows you to run an instant query for a given range and resolution. The result of a subquery is a range vector.
|
||||
|
||||
Syntax: `<instant_query> '[' <range> ':' [<resolution>] ']' [ offset <duration> ]`
|
||||
|
||||
* `<resolution>` is optional. Default is the global evaluation interval.
|
||||
|
||||
## Operators
|
||||
|
||||
Prometheus supports many binary and aggregation operators. These are described
|
||||
|
|
|
@ -38,6 +38,16 @@ To select all HTTP status codes except 4xx ones, you could run:
|
|||
|
||||
http_requests_total{status!~"4.."}
|
||||
|
||||
## Subquery
|
||||
|
||||
This query returns 5-minute rate of `http_requests_total` metric for the past 30 minutes, at a resolution of 1 minute.
|
||||
|
||||
rate(http_requests_total[5m])[30m:1m]
|
||||
|
||||
This is an example of nested subquery. The subquery for the `deriv` function uses default resolution. Note that using subqueries unnecessarily is unwise.
|
||||
|
||||
max_over_time(deriv(rate(distance_covered_total[5s])[30s:5s])[10m:])
|
||||
|
||||
## Using functions, operators, etc.
|
||||
|
||||
Return the per-second rate for all time series with the `http_requests_total`
|
||||
|
|
3
go.mod
3
go.mod
|
@ -5,6 +5,7 @@ require (
|
|||
github.com/Azure/go-autorest v10.8.1+incompatible
|
||||
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f // indirect
|
||||
github.com/VividCortex/ewma v1.1.1 // indirect
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
|
||||
github.com/aws/aws-sdk-go v0.0.0-20180507225419-00862f899353
|
||||
github.com/biogo/store v0.0.0-20160505134755-913427a1d5e8 // indirect
|
||||
|
@ -88,7 +89,7 @@ require (
|
|||
github.com/prometheus/client_golang v0.9.1
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
|
||||
github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea
|
||||
github.com/prometheus/tsdb v0.3.1
|
||||
github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
|
||||
github.com/rlmcpherson/s3gof3r v0.5.0 // indirect
|
||||
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -209,8 +209,8 @@ github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea h1:4RkbEb5XX0Wvu
|
|||
github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/tsdb v0.3.1 h1:uGgfubT2MesNpx3T46c5R32RcUoKAPGyWX+4x1orJLE=
|
||||
github.com/prometheus/tsdb v0.3.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc h1:phU3kj067sczIc4fhaq5rRcH4Lp9A45MsrcQqjC+cao=
|
||||
github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k=
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:generate go get github.com/cznic/golex
|
||||
//go:generate go get -u modernc.org/golex
|
||||
//go:generate golex -o=openmetricslex.l.go openmetricslex.l
|
||||
|
||||
package textparse
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:generate go get github.com/cznic/golex
|
||||
//go:generate go get -u modernc.org/golex
|
||||
//go:generate golex -o=promlex.l.go promlex.l
|
||||
|
||||
package textparse
|
||||
|
|
|
@ -116,6 +116,14 @@ type MatrixSelector struct {
|
|||
series []storage.Series
|
||||
}
|
||||
|
||||
// SubqueryExpr represents a subquery.
|
||||
type SubqueryExpr struct {
|
||||
Expr Expr
|
||||
Range time.Duration
|
||||
Offset time.Duration
|
||||
Step time.Duration
|
||||
}
|
||||
|
||||
// NumberLiteral represents a number.
|
||||
type NumberLiteral struct {
|
||||
Val float64
|
||||
|
@ -153,6 +161,7 @@ type VectorSelector struct {
|
|||
func (e *AggregateExpr) Type() ValueType { return ValueTypeVector }
|
||||
func (e *Call) Type() ValueType { return e.Func.ReturnType }
|
||||
func (e *MatrixSelector) Type() ValueType { return ValueTypeMatrix }
|
||||
func (e *SubqueryExpr) Type() ValueType { return ValueTypeMatrix }
|
||||
func (e *NumberLiteral) Type() ValueType { return ValueTypeScalar }
|
||||
func (e *ParenExpr) Type() ValueType { return e.Expr.Type() }
|
||||
func (e *StringLiteral) Type() ValueType { return ValueTypeString }
|
||||
|
@ -169,6 +178,7 @@ func (*AggregateExpr) expr() {}
|
|||
func (*BinaryExpr) expr() {}
|
||||
func (*Call) expr() {}
|
||||
func (*MatrixSelector) expr() {}
|
||||
func (*SubqueryExpr) expr() {}
|
||||
func (*NumberLiteral) expr() {}
|
||||
func (*ParenExpr) expr() {}
|
||||
func (*StringLiteral) expr() {}
|
||||
|
@ -267,6 +277,11 @@ func Walk(v Visitor, node Node, path []Node) error {
|
|||
return err
|
||||
}
|
||||
|
||||
case *SubqueryExpr:
|
||||
if err := Walk(v, n.Expr, path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case *ParenExpr:
|
||||
if err := Walk(v, n.Expr, path); err != nil {
|
||||
return err
|
||||
|
|
153
promql/engine.go
153
promql/engine.go
|
@ -23,6 +23,7 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
|
@ -51,6 +52,26 @@ const (
|
|||
minInt64 = -9223372036854775808
|
||||
)
|
||||
|
||||
var (
|
||||
// LookbackDelta determines the time since the last sample after which a time
|
||||
// series is considered stale.
|
||||
LookbackDelta = 5 * time.Minute
|
||||
|
||||
// DefaultEvaluationInterval is the default evaluation interval of
|
||||
// a subquery in milliseconds.
|
||||
DefaultEvaluationInterval int64
|
||||
)
|
||||
|
||||
// SetDefaultEvaluationInterval sets DefaultEvaluationInterval.
|
||||
func SetDefaultEvaluationInterval(ev time.Duration) {
|
||||
atomic.StoreInt64(&DefaultEvaluationInterval, durationToInt64Millis(ev))
|
||||
}
|
||||
|
||||
// GetDefaultEvaluationInterval returns the DefaultEvaluationInterval as time.Duration.
|
||||
func GetDefaultEvaluationInterval() int64 {
|
||||
return atomic.LoadInt64(&DefaultEvaluationInterval)
|
||||
}
|
||||
|
||||
type engineMetrics struct {
|
||||
currentQueries prometheus.Gauge
|
||||
maxConcurrentQueries prometheus.Gauge
|
||||
|
@ -404,12 +425,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
if s.Start == s.End && s.Interval == 0 {
|
||||
start := timeMilliseconds(s.Start)
|
||||
evaluator := &evaluator{
|
||||
startTimestamp: start,
|
||||
endTimestamp: start,
|
||||
interval: 1,
|
||||
ctx: ctx,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
logger: ng.logger,
|
||||
startTimestamp: start,
|
||||
endTimestamp: start,
|
||||
interval: 1,
|
||||
ctx: ctx,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
defaultEvalInterval: GetDefaultEvaluationInterval(),
|
||||
logger: ng.logger,
|
||||
}
|
||||
val, err := evaluator.Eval(s.Expr)
|
||||
if err != nil {
|
||||
|
@ -445,12 +467,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
|
||||
// Range evaluation.
|
||||
evaluator := &evaluator{
|
||||
startTimestamp: timeMilliseconds(s.Start),
|
||||
endTimestamp: timeMilliseconds(s.End),
|
||||
interval: durationMilliseconds(s.Interval),
|
||||
ctx: ctx,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
logger: ng.logger,
|
||||
startTimestamp: timeMilliseconds(s.Start),
|
||||
endTimestamp: timeMilliseconds(s.End),
|
||||
interval: durationMilliseconds(s.Interval),
|
||||
ctx: ctx,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
defaultEvalInterval: GetDefaultEvaluationInterval(),
|
||||
logger: ng.logger,
|
||||
}
|
||||
val, err := evaluator.Eval(s.Expr)
|
||||
if err != nil {
|
||||
|
@ -477,23 +500,36 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
return mat, nil, warnings
|
||||
}
|
||||
|
||||
// cumulativeSubqueryOffset returns the sum of range and offset of all subqueries in the path.
|
||||
func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {
|
||||
var subqOffset time.Duration
|
||||
for _, node := range path {
|
||||
switch n := node.(type) {
|
||||
case *SubqueryExpr:
|
||||
subqOffset += n.Range + n.Offset
|
||||
}
|
||||
}
|
||||
return subqOffset
|
||||
}
|
||||
|
||||
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error, storage.Warnings) {
|
||||
var maxOffset time.Duration
|
||||
Inspect(s.Expr, func(node Node, _ []Node) error {
|
||||
Inspect(s.Expr, func(node Node, path []Node) error {
|
||||
subqOffset := ng.cumulativeSubqueryOffset(path)
|
||||
switch n := node.(type) {
|
||||
case *VectorSelector:
|
||||
if maxOffset < LookbackDelta {
|
||||
maxOffset = LookbackDelta
|
||||
if maxOffset < LookbackDelta+subqOffset {
|
||||
maxOffset = LookbackDelta + subqOffset
|
||||
}
|
||||
if n.Offset+LookbackDelta > maxOffset {
|
||||
maxOffset = n.Offset + LookbackDelta
|
||||
if n.Offset+LookbackDelta+subqOffset > maxOffset {
|
||||
maxOffset = n.Offset + LookbackDelta + subqOffset
|
||||
}
|
||||
case *MatrixSelector:
|
||||
if maxOffset < n.Range {
|
||||
maxOffset = n.Range
|
||||
if maxOffset < n.Range+subqOffset {
|
||||
maxOffset = n.Range + subqOffset
|
||||
}
|
||||
if n.Offset+n.Range > maxOffset {
|
||||
maxOffset = n.Offset + n.Range
|
||||
if n.Offset+n.Range+subqOffset > maxOffset {
|
||||
maxOffset = n.Offset + n.Range + subqOffset
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -514,7 +550,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
|||
params := &storage.SelectParams{
|
||||
Start: timestamp.FromTime(s.Start),
|
||||
End: timestamp.FromTime(s.End),
|
||||
Step: int64(s.Interval / time.Millisecond),
|
||||
Step: durationToInt64Millis(s.Interval),
|
||||
}
|
||||
|
||||
switch n := node.(type) {
|
||||
|
@ -527,7 +563,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
|||
params.End = params.End - offsetMilliseconds
|
||||
}
|
||||
|
||||
set, err, wrn = querier.Select(params, n.LabelMatchers...)
|
||||
set, wrn, err = querier.Select(params, n.LabelMatchers...)
|
||||
warnings = append(warnings, wrn...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
|
@ -546,7 +582,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
|||
params.End = params.End - offsetMilliseconds
|
||||
}
|
||||
|
||||
set, err, wrn = querier.Select(params, n.LabelMatchers...)
|
||||
set, wrn, err = querier.Select(params, n.LabelMatchers...)
|
||||
warnings = append(warnings, wrn...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
|
@ -624,9 +660,10 @@ type evaluator struct {
|
|||
endTimestamp int64 // End time in milliseconds.
|
||||
interval int64 // Interval in milliseconds.
|
||||
|
||||
maxSamples int
|
||||
currentSamples int
|
||||
logger log.Logger
|
||||
maxSamples int
|
||||
currentSamples int
|
||||
defaultEvalInterval int64
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// errorf causes a panic with the input formatted into an error.
|
||||
|
@ -839,6 +876,21 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ..
|
|||
return mat
|
||||
}
|
||||
|
||||
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
|
||||
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
|
||||
func (ev *evaluator) evalSubquery(subq *SubqueryExpr) *MatrixSelector {
|
||||
val := ev.eval(subq).(Matrix)
|
||||
ms := &MatrixSelector{
|
||||
Range: subq.Range,
|
||||
Offset: subq.Offset,
|
||||
series: make([]storage.Series, 0, len(val)),
|
||||
}
|
||||
for _, s := range val {
|
||||
ms.series = append(ms.series, NewStorageSeries(s))
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
// eval evaluates the given expression as the given AST expression node requires.
|
||||
func (ev *evaluator) eval(expr Expr) Value {
|
||||
// This is the top-level evaluation method.
|
||||
|
@ -880,12 +932,19 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||
var matrixArgIndex int
|
||||
var matrixArg bool
|
||||
for i, a := range e.Args {
|
||||
_, ok := a.(*MatrixSelector)
|
||||
if ok {
|
||||
if _, ok := a.(*MatrixSelector); ok {
|
||||
matrixArgIndex = i
|
||||
matrixArg = true
|
||||
break
|
||||
}
|
||||
// SubqueryExpr can be used in place of MatrixSelector.
|
||||
if subq, ok := a.(*SubqueryExpr); ok {
|
||||
matrixArgIndex = i
|
||||
matrixArg = true
|
||||
// Replacing SubqueryExpr with MatrixSelector.
|
||||
e.Args[i] = ev.evalSubquery(subq)
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matrixArg {
|
||||
// Does not have a matrix argument.
|
||||
|
@ -1077,11 +1136,43 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||
panic(fmt.Errorf("cannot do range evaluation of matrix selector"))
|
||||
}
|
||||
return ev.matrixSelector(e)
|
||||
|
||||
case *SubqueryExpr:
|
||||
offsetMillis := durationToInt64Millis(e.Offset)
|
||||
rangeMillis := durationToInt64Millis(e.Range)
|
||||
newEv := &evaluator{
|
||||
endTimestamp: ev.endTimestamp - offsetMillis,
|
||||
interval: ev.defaultEvalInterval,
|
||||
ctx: ev.ctx,
|
||||
currentSamples: ev.currentSamples,
|
||||
maxSamples: ev.maxSamples,
|
||||
defaultEvalInterval: ev.defaultEvalInterval,
|
||||
logger: ev.logger,
|
||||
}
|
||||
|
||||
if e.Step != 0 {
|
||||
newEv.interval = durationToInt64Millis(e.Step)
|
||||
}
|
||||
|
||||
// Start with the first timestamp after (ev.startTimestamp - offset - range)
|
||||
// that is aligned with the step (multiple of 'newEv.interval').
|
||||
newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval)
|
||||
if newEv.startTimestamp < (ev.startTimestamp - offsetMillis - rangeMillis) {
|
||||
newEv.startTimestamp += newEv.interval
|
||||
}
|
||||
|
||||
res := newEv.eval(e.Expr)
|
||||
ev.currentSamples = newEv.currentSamples
|
||||
return res
|
||||
}
|
||||
|
||||
panic(fmt.Errorf("unhandled expression of type: %T", expr))
|
||||
}
|
||||
|
||||
func durationToInt64Millis(d time.Duration) int64 {
|
||||
return int64(d / time.Millisecond)
|
||||
}
|
||||
|
||||
// vectorSelector evaluates a *VectorSelector expression.
|
||||
func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
|
||||
if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil {
|
||||
|
@ -1825,10 +1916,6 @@ func shouldDropMetricName(op ItemType) bool {
|
|||
}
|
||||
}
|
||||
|
||||
// LookbackDelta determines the time since the last sample after which a time
|
||||
// series is considered stale.
|
||||
var LookbackDelta = 5 * time.Minute
|
||||
|
||||
// documentedType returns the internal type to the equivalent
|
||||
// user facing terminology as defined in the documentation.
|
||||
func documentedType(t ValueType) string {
|
||||
|
|
|
@ -169,8 +169,8 @@ type errQuerier struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
|
||||
return errSeriesSet{err: q.err}, q.err, nil
|
||||
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return errSeriesSet{err: q.err}, nil, q.err
|
||||
}
|
||||
func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
|
||||
func (*errQuerier) LabelNames() ([]string, error) { return nil, nil }
|
||||
|
@ -475,6 +475,34 @@ load 10s
|
|||
},
|
||||
Start: time.Unix(10, 0),
|
||||
},
|
||||
{
|
||||
Query: "rate(metric[20s])",
|
||||
MaxSamples: 3,
|
||||
Result: Result{
|
||||
nil,
|
||||
Vector{
|
||||
Sample{
|
||||
Point: Point{V: 0.1, T: 10000},
|
||||
Metric: labels.Labels{},
|
||||
},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s]",
|
||||
MaxSamples: 3,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s]",
|
||||
MaxSamples: 0,
|
||||
|
@ -624,3 +652,265 @@ func TestRecoverEvaluatorError(t *testing.T) {
|
|||
|
||||
panic(e)
|
||||
}
|
||||
|
||||
func TestSubquerySelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
loadString string
|
||||
cases []struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}
|
||||
}{
|
||||
{
|
||||
loadString: `load 10s
|
||||
metric 1 2`,
|
||||
cases: []struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}{
|
||||
{
|
||||
Query: "metric[20s:10s]",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s]",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s] offset 2s",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(12, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s] offset 6s",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(20, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s] offset 4s",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}, {V: 2, T: 30000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(35, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s] offset 5s",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 2, T: 10000}, {V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}, {V: 2, T: 30000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(35, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s] offset 6s",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 2, T: 10000}, {V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(35, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s:5s] offset 7s",
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 2, T: 10000}, {V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(35, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
loadString: `load 10s
|
||||
http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000
|
||||
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000
|
||||
http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000
|
||||
http_requests{job="api-server", instance="1", group="canary"} 0+40x2000`,
|
||||
cases: []struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}{
|
||||
{ // Normal selector.
|
||||
Query: `http_requests{group=~"pro.*",instance="0"}[30s:10s]`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 9990, T: 9990000}, {V: 10000, T: 10000000}, {V: 100, T: 10010000}, {V: 130, T: 10020000}},
|
||||
Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10020, 0),
|
||||
},
|
||||
{ // Default step.
|
||||
Query: `http_requests{group=~"pro.*",instance="0"}[5m:]`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 9840, T: 9840000}, {V: 9900, T: 9900000}, {V: 9960, T: 9960000}, {V: 130, T: 10020000}, {V: 310, T: 10080000}},
|
||||
Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10100, 0),
|
||||
},
|
||||
{ // Checking if high offset (>LookbackDelta) is being taken care of.
|
||||
Query: `http_requests{group=~"pro.*",instance="0"}[5m:] offset 20m`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 8640, T: 8640000}, {V: 8700, T: 8700000}, {V: 8760, T: 8760000}, {V: 8820, T: 8820000}, {V: 8880, T: 8880000}},
|
||||
Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production")},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(10100, 0),
|
||||
},
|
||||
{
|
||||
Query: `rate(http_requests[1m])[15s:5s]`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{
|
||||
Series{
|
||||
Points: []Point{{V: 3, T: 7985000}, {V: 3, T: 7990000}, {V: 3, T: 7995000}, {V: 3, T: 8000000}},
|
||||
Metric: labels.FromStrings("job", "api-server", "instance", "0", "group", "canary"),
|
||||
},
|
||||
Series{
|
||||
Points: []Point{{V: 4, T: 7985000}, {V: 4, T: 7990000}, {V: 4, T: 7995000}, {V: 4, T: 8000000}},
|
||||
Metric: labels.FromStrings("job", "api-server", "instance", "1", "group", "canary"),
|
||||
},
|
||||
Series{
|
||||
Points: []Point{{V: 1, T: 7985000}, {V: 1, T: 7990000}, {V: 1, T: 7995000}, {V: 1, T: 8000000}},
|
||||
Metric: labels.FromStrings("job", "api-server", "instance", "0", "group", "production"),
|
||||
},
|
||||
Series{
|
||||
Points: []Point{{V: 2, T: 7985000}, {V: 2, T: 7990000}, {V: 2, T: 7995000}, {V: 2, T: 8000000}},
|
||||
Metric: labels.FromStrings("job", "api-server", "instance", "1", "group", "production"),
|
||||
},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(8000, 0),
|
||||
},
|
||||
{
|
||||
Query: `sum(http_requests{group=~"pro.*"})[30s:10s]`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 270, T: 90000}, {V: 300, T: 100000}, {V: 330, T: 110000}, {V: 360, T: 120000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(120, 0),
|
||||
},
|
||||
{
|
||||
Query: `sum(http_requests)[40s:10s]`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 800, T: 80000}, {V: 900, T: 90000}, {V: 1000, T: 100000}, {V: 1100, T: 110000}, {V: 1200, T: 120000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(120, 0),
|
||||
},
|
||||
{
|
||||
Query: `(sum(http_requests{group=~"p.*"})+sum(http_requests{group=~"c.*"}))[20s:5s]`,
|
||||
Result: Result{
|
||||
nil,
|
||||
Matrix{Series{
|
||||
Points: []Point{{V: 1000, T: 100000}, {V: 1000, T: 105000}, {V: 1100, T: 110000}, {V: 1100, T: 115000}, {V: 1200, T: 120000}},
|
||||
Metric: labels.Labels{}},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
Start: time.Unix(120, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
SetDefaultEvaluationInterval(1 * time.Minute)
|
||||
for _, tst := range tests {
|
||||
test, err := NewTest(t, tst.loadString)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating test: %q", err)
|
||||
}
|
||||
defer test.Close()
|
||||
|
||||
err = test.Run()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error initializing test: %q", err)
|
||||
}
|
||||
|
||||
engine := test.QueryEngine()
|
||||
for _, c := range tst.cases {
|
||||
var err error
|
||||
var qry Query
|
||||
|
||||
qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating query: %q", err)
|
||||
}
|
||||
res := qry.Exec(test.Context())
|
||||
if res.Err != nil && res.Err != c.Result.Err {
|
||||
t.Fatalf("unexpected error running query: %q, expected to get result: %q", res.Err, c.Result.Value)
|
||||
}
|
||||
if !reflect.DeepEqual(res.Value, c.Result.Value) {
|
||||
t.Fatalf("unexpected result for query %q: got %q wanted %q", c.Query, res.Value.String(), c.Result.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -137,6 +137,7 @@ const (
|
|||
itemRightBracket
|
||||
itemComma
|
||||
itemAssign
|
||||
itemColon
|
||||
itemSemicolon
|
||||
itemString
|
||||
itemNumber
|
||||
|
@ -235,6 +236,7 @@ var itemTypeStr = map[ItemType]string{
|
|||
itemRightBracket: "]",
|
||||
itemComma: ",",
|
||||
itemAssign: "=",
|
||||
itemColon: ":",
|
||||
itemSemicolon: ";",
|
||||
itemBlank: "_",
|
||||
itemTimes: "x",
|
||||
|
@ -326,6 +328,7 @@ type lexer struct {
|
|||
parenDepth int // Nesting depth of ( ) exprs.
|
||||
braceOpen bool // Whether a { is opened.
|
||||
bracketOpen bool // Whether a [ is opened.
|
||||
gotColon bool // Whether we got a ':' after [ was opened.
|
||||
stringOpen rune // Quote rune of the string currently being read.
|
||||
|
||||
// seriesDesc is set when a series description for the testing
|
||||
|
@ -517,8 +520,15 @@ func lexStatements(l *lexer) stateFn {
|
|||
l.stringOpen = r
|
||||
return lexRawString
|
||||
case isAlpha(r) || r == ':':
|
||||
l.backup()
|
||||
return lexKeywordOrIdentifier
|
||||
if !l.bracketOpen {
|
||||
l.backup()
|
||||
return lexKeywordOrIdentifier
|
||||
}
|
||||
if l.gotColon {
|
||||
return l.errorf("unexpected colon %q", r)
|
||||
}
|
||||
l.emit(itemColon)
|
||||
l.gotColon = true
|
||||
case r == '(':
|
||||
l.emit(itemLeftParen)
|
||||
l.parenDepth++
|
||||
|
@ -538,6 +548,7 @@ func lexStatements(l *lexer) stateFn {
|
|||
if l.bracketOpen {
|
||||
return l.errorf("unexpected left bracket %q", r)
|
||||
}
|
||||
l.gotColon = false
|
||||
l.emit(itemLeftBracket)
|
||||
l.bracketOpen = true
|
||||
return lexDuration
|
||||
|
|
|
@ -429,6 +429,167 @@ var tests = []struct {
|
|||
},
|
||||
seriesDesc: true,
|
||||
},
|
||||
// Test subquery.
|
||||
{
|
||||
input: `test_name{on!~"bar"}[4m:4s]`,
|
||||
expected: []item{
|
||||
{itemIdentifier, 0, `test_name`},
|
||||
{itemLeftBrace, 9, `{`},
|
||||
{itemIdentifier, 10, `on`},
|
||||
{itemNEQRegex, 12, `!~`},
|
||||
{itemString, 14, `"bar"`},
|
||||
{itemRightBrace, 19, `}`},
|
||||
{itemLeftBracket, 20, `[`},
|
||||
{itemDuration, 21, `4m`},
|
||||
{itemColon, 23, `:`},
|
||||
{itemDuration, 24, `4s`},
|
||||
{itemRightBracket, 26, `]`},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: `test:name{on!~"bar"}[4m:4s]`,
|
||||
expected: []item{
|
||||
{itemMetricIdentifier, 0, `test:name`},
|
||||
{itemLeftBrace, 9, `{`},
|
||||
{itemIdentifier, 10, `on`},
|
||||
{itemNEQRegex, 12, `!~`},
|
||||
{itemString, 14, `"bar"`},
|
||||
{itemRightBrace, 19, `}`},
|
||||
{itemLeftBracket, 20, `[`},
|
||||
{itemDuration, 21, `4m`},
|
||||
{itemColon, 23, `:`},
|
||||
{itemDuration, 24, `4s`},
|
||||
{itemRightBracket, 26, `]`},
|
||||
},
|
||||
}, {
|
||||
input: `test:name{on!~"b:ar"}[4m:4s]`,
|
||||
expected: []item{
|
||||
{itemMetricIdentifier, 0, `test:name`},
|
||||
{itemLeftBrace, 9, `{`},
|
||||
{itemIdentifier, 10, `on`},
|
||||
{itemNEQRegex, 12, `!~`},
|
||||
{itemString, 14, `"b:ar"`},
|
||||
{itemRightBrace, 20, `}`},
|
||||
{itemLeftBracket, 21, `[`},
|
||||
{itemDuration, 22, `4m`},
|
||||
{itemColon, 24, `:`},
|
||||
{itemDuration, 25, `4s`},
|
||||
{itemRightBracket, 27, `]`},
|
||||
},
|
||||
}, {
|
||||
input: `test:name{on!~"b:ar"}[4m:]`,
|
||||
expected: []item{
|
||||
{itemMetricIdentifier, 0, `test:name`},
|
||||
{itemLeftBrace, 9, `{`},
|
||||
{itemIdentifier, 10, `on`},
|
||||
{itemNEQRegex, 12, `!~`},
|
||||
{itemString, 14, `"b:ar"`},
|
||||
{itemRightBrace, 20, `}`},
|
||||
{itemLeftBracket, 21, `[`},
|
||||
{itemDuration, 22, `4m`},
|
||||
{itemColon, 24, `:`},
|
||||
{itemRightBracket, 25, `]`},
|
||||
},
|
||||
}, { // Nested Subquery.
|
||||
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:])[4m:3s]`,
|
||||
expected: []item{
|
||||
|
||||
{itemIdentifier, 0, `min_over_time`},
|
||||
{itemLeftParen, 13, `(`},
|
||||
{itemIdentifier, 14, `rate`},
|
||||
{itemLeftParen, 18, `(`},
|
||||
{itemIdentifier, 19, `foo`},
|
||||
{itemLeftBrace, 22, `{`},
|
||||
{itemIdentifier, 23, `bar`},
|
||||
{itemEQL, 26, `=`},
|
||||
{itemString, 27, `"baz"`},
|
||||
{itemRightBrace, 32, `}`},
|
||||
{itemLeftBracket, 33, `[`},
|
||||
{itemDuration, 34, `2s`},
|
||||
{itemRightBracket, 36, `]`},
|
||||
{itemRightParen, 37, `)`},
|
||||
{itemLeftBracket, 38, `[`},
|
||||
{itemDuration, 39, `5m`},
|
||||
{itemColon, 41, `:`},
|
||||
{itemRightBracket, 42, `]`},
|
||||
{itemRightParen, 43, `)`},
|
||||
{itemLeftBracket, 44, `[`},
|
||||
{itemDuration, 45, `4m`},
|
||||
{itemColon, 47, `:`},
|
||||
{itemDuration, 48, `3s`},
|
||||
{itemRightBracket, 50, `]`},
|
||||
},
|
||||
},
|
||||
// Subquery with offset.
|
||||
{
|
||||
input: `test:name{on!~"b:ar"}[4m:4s] offset 10m`,
|
||||
expected: []item{
|
||||
{itemMetricIdentifier, 0, `test:name`},
|
||||
{itemLeftBrace, 9, `{`},
|
||||
{itemIdentifier, 10, `on`},
|
||||
{itemNEQRegex, 12, `!~`},
|
||||
{itemString, 14, `"b:ar"`},
|
||||
{itemRightBrace, 20, `}`},
|
||||
{itemLeftBracket, 21, `[`},
|
||||
{itemDuration, 22, `4m`},
|
||||
{itemColon, 24, `:`},
|
||||
{itemDuration, 25, `4s`},
|
||||
{itemRightBracket, 27, `]`},
|
||||
{itemOffset, 29, "offset"},
|
||||
{itemDuration, 36, "10m"},
|
||||
},
|
||||
}, {
|
||||
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:] offset 6m)[4m:3s]`,
|
||||
expected: []item{
|
||||
|
||||
{itemIdentifier, 0, `min_over_time`},
|
||||
{itemLeftParen, 13, `(`},
|
||||
{itemIdentifier, 14, `rate`},
|
||||
{itemLeftParen, 18, `(`},
|
||||
{itemIdentifier, 19, `foo`},
|
||||
{itemLeftBrace, 22, `{`},
|
||||
{itemIdentifier, 23, `bar`},
|
||||
{itemEQL, 26, `=`},
|
||||
{itemString, 27, `"baz"`},
|
||||
{itemRightBrace, 32, `}`},
|
||||
{itemLeftBracket, 33, `[`},
|
||||
{itemDuration, 34, `2s`},
|
||||
{itemRightBracket, 36, `]`},
|
||||
{itemRightParen, 37, `)`},
|
||||
{itemLeftBracket, 38, `[`},
|
||||
{itemDuration, 39, `5m`},
|
||||
{itemColon, 41, `:`},
|
||||
{itemRightBracket, 42, `]`},
|
||||
{itemOffset, 44, `offset`},
|
||||
{itemDuration, 51, `6m`},
|
||||
{itemRightParen, 53, `)`},
|
||||
{itemLeftBracket, 54, `[`},
|
||||
{itemDuration, 55, `4m`},
|
||||
{itemColon, 57, `:`},
|
||||
{itemDuration, 58, `3s`},
|
||||
{itemRightBracket, 60, `]`},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: `test:name{o:n!~"bar"}[4m:4s]`,
|
||||
fail: true,
|
||||
},
|
||||
{
|
||||
input: `test:name{on!~"bar"}[4m:4s:4h]`,
|
||||
fail: true,
|
||||
},
|
||||
{
|
||||
input: `test:name{on!~"bar"}[4m:4s:]`,
|
||||
fail: true,
|
||||
},
|
||||
{
|
||||
input: `test:name{on!~"bar"}[4m::]`,
|
||||
fail: true,
|
||||
},
|
||||
{
|
||||
input: `test:name{on!~"bar"}[:4s]`,
|
||||
fail: true,
|
||||
},
|
||||
}
|
||||
|
||||
// TestLexer tests basic functionality of the lexer. More elaborate tests are implemented
|
||||
|
|
|
@ -351,6 +351,17 @@ func (p *parser) expr() Expr {
|
|||
// If the next token is not an operator the expression is done.
|
||||
op := p.peek().typ
|
||||
if !op.isOperator() {
|
||||
// Check for subquery.
|
||||
if op == itemLeftBracket {
|
||||
expr = p.subqueryOrRangeSelector(expr, false)
|
||||
if s, ok := expr.(*SubqueryExpr); ok {
|
||||
// Parse optional offset.
|
||||
if p.peek().typ == itemOffset {
|
||||
offset := p.offset()
|
||||
s.Offset = offset
|
||||
}
|
||||
}
|
||||
}
|
||||
return expr
|
||||
}
|
||||
p.next() // Consume operator.
|
||||
|
@ -471,11 +482,7 @@ func (p *parser) unaryExpr() Expr {
|
|||
|
||||
// Expression might be followed by a range selector.
|
||||
if p.peek().typ == itemLeftBracket {
|
||||
vs, ok := e.(*VectorSelector)
|
||||
if !ok {
|
||||
p.errorf("range specification must be preceded by a metric selector, but follows a %T instead", e)
|
||||
}
|
||||
e = p.rangeSelector(vs)
|
||||
e = p.subqueryOrRangeSelector(e, true)
|
||||
}
|
||||
|
||||
// Parse optional offset.
|
||||
|
@ -487,6 +494,8 @@ func (p *parser) unaryExpr() Expr {
|
|||
s.Offset = offset
|
||||
case *MatrixSelector:
|
||||
s.Offset = offset
|
||||
case *SubqueryExpr:
|
||||
s.Offset = offset
|
||||
default:
|
||||
p.errorf("offset modifier must be preceded by an instant or range selector, but follows a %T instead", e)
|
||||
}
|
||||
|
@ -495,13 +504,17 @@ func (p *parser) unaryExpr() Expr {
|
|||
return e
|
||||
}
|
||||
|
||||
// rangeSelector parses a Matrix (a.k.a. range) selector based on a given
|
||||
// Vector selector.
|
||||
// subqueryOrRangeSelector parses a Subquery based on given Expr (or)
|
||||
// a Matrix (a.k.a. range) selector based on a given Vector selector.
|
||||
//
|
||||
// <Vector_selector> '[' <duration> ']'
|
||||
// <Vector_selector> '[' <duration> ']' | <Vector_selector> '[' <duration> ':' [<duration>] ']'
|
||||
//
|
||||
func (p *parser) rangeSelector(vs *VectorSelector) *MatrixSelector {
|
||||
const ctx = "range selector"
|
||||
func (p *parser) subqueryOrRangeSelector(expr Expr, checkRange bool) Expr {
|
||||
ctx := "subquery selector"
|
||||
if checkRange {
|
||||
ctx = "range/subquery selector"
|
||||
}
|
||||
|
||||
p.next()
|
||||
|
||||
var erange time.Duration
|
||||
|
@ -513,14 +526,43 @@ func (p *parser) rangeSelector(vs *VectorSelector) *MatrixSelector {
|
|||
p.error(err)
|
||||
}
|
||||
|
||||
p.expect(itemRightBracket, ctx)
|
||||
|
||||
e := &MatrixSelector{
|
||||
Name: vs.Name,
|
||||
LabelMatchers: vs.LabelMatchers,
|
||||
Range: erange,
|
||||
var itm item
|
||||
if checkRange {
|
||||
itm = p.expectOneOf(itemRightBracket, itemColon, ctx)
|
||||
if itm.typ == itemRightBracket {
|
||||
// Range selector.
|
||||
vs, ok := expr.(*VectorSelector)
|
||||
if !ok {
|
||||
p.errorf("range specification must be preceded by a metric selector, but follows a %T instead", expr)
|
||||
}
|
||||
return &MatrixSelector{
|
||||
Name: vs.Name,
|
||||
LabelMatchers: vs.LabelMatchers,
|
||||
Range: erange,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
itm = p.expect(itemColon, ctx)
|
||||
}
|
||||
|
||||
// Subquery.
|
||||
var estep time.Duration
|
||||
|
||||
itm = p.expectOneOf(itemRightBracket, itemDuration, ctx)
|
||||
if itm.typ == itemDuration {
|
||||
estepStr := itm.val
|
||||
estep, err = parseDuration(estepStr)
|
||||
if err != nil {
|
||||
p.error(err)
|
||||
}
|
||||
p.expect(itemRightBracket, ctx)
|
||||
}
|
||||
|
||||
return &SubqueryExpr{
|
||||
Expr: expr,
|
||||
Range: erange,
|
||||
Step: estep,
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// number parses a number.
|
||||
|
@ -1000,6 +1042,12 @@ func (p *parser) checkType(node Node) (typ ValueType) {
|
|||
p.errorf("unary expression only allowed on expressions of type scalar or instant vector, got %q", documentedType(t))
|
||||
}
|
||||
|
||||
case *SubqueryExpr:
|
||||
ty := p.checkType(n.Expr)
|
||||
if ty != ValueTypeVector {
|
||||
p.errorf("subquery is only allowed on instant vector, got %s in %q instead", ty, n.String())
|
||||
}
|
||||
|
||||
case *NumberLiteral, *MatrixSelector, *StringLiteral, *VectorSelector:
|
||||
// Nothing to do for terminals.
|
||||
|
||||
|
|
|
@ -942,10 +942,6 @@ var testExpr = []struct {
|
|||
input: `foo{__name__="bar"}`,
|
||||
fail: true,
|
||||
errMsg: "metric name must not be set twice: \"foo\" or \"bar\"",
|
||||
// }, {
|
||||
// input: `:foo`,
|
||||
// fail: true,
|
||||
// errMsg: "bla",
|
||||
},
|
||||
// Test matrix selector.
|
||||
{
|
||||
|
@ -1051,11 +1047,11 @@ var testExpr = []struct {
|
|||
}, {
|
||||
input: `some_metric OFFSET 1m[5m]`,
|
||||
fail: true,
|
||||
errMsg: "could not parse remaining input \"[5m]\"...",
|
||||
errMsg: "parse error at char 25: unexpected \"]\" in subquery selector, expected \":\"",
|
||||
}, {
|
||||
input: `(foo + bar)[5m]`,
|
||||
fail: true,
|
||||
errMsg: "could not parse remaining input \"[5m]\"...",
|
||||
errMsg: "parse error at char 15: unexpected \"]\" in subquery selector, expected \":\"",
|
||||
},
|
||||
// Test aggregation.
|
||||
{
|
||||
|
@ -1390,6 +1386,202 @@ var testExpr = []struct {
|
|||
fail: true,
|
||||
errMsg: "illegal character U+002E '.' in escape sequence",
|
||||
},
|
||||
// Subquery.
|
||||
{
|
||||
input: `foo{bar="baz"}[10m:6s]`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &VectorSelector{
|
||||
Name: "foo",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, "bar", "baz"),
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
Range: 10 * time.Minute,
|
||||
Step: 6 * time.Second,
|
||||
},
|
||||
}, {
|
||||
input: `foo[10m:]`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &VectorSelector{
|
||||
Name: "foo",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
Range: 10 * time.Minute,
|
||||
},
|
||||
}, {
|
||||
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:5s])`,
|
||||
expected: &Call{
|
||||
Func: mustGetFunction("min_over_time"),
|
||||
Args: Expressions{
|
||||
&SubqueryExpr{
|
||||
Expr: &Call{
|
||||
Func: mustGetFunction("rate"),
|
||||
Args: Expressions{
|
||||
&MatrixSelector{
|
||||
Name: "foo",
|
||||
Range: 2 * time.Second,
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, "bar", "baz"),
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 5 * time.Minute,
|
||||
Step: 5 * time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, {
|
||||
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:])[4m:3s]`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &Call{
|
||||
Func: mustGetFunction("min_over_time"),
|
||||
Args: Expressions{
|
||||
&SubqueryExpr{
|
||||
Expr: &Call{
|
||||
Func: mustGetFunction("rate"),
|
||||
Args: Expressions{
|
||||
&MatrixSelector{
|
||||
Name: "foo",
|
||||
Range: 2 * time.Second,
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, "bar", "baz"),
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 5 * time.Minute,
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 4 * time.Minute,
|
||||
Step: 3 * time.Second,
|
||||
},
|
||||
}, {
|
||||
input: `min_over_time(rate(foo{bar="baz"}[2s])[5m:] offset 4m)[4m:3s]`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &Call{
|
||||
Func: mustGetFunction("min_over_time"),
|
||||
Args: Expressions{
|
||||
&SubqueryExpr{
|
||||
Expr: &Call{
|
||||
Func: mustGetFunction("rate"),
|
||||
Args: Expressions{
|
||||
&MatrixSelector{
|
||||
Name: "foo",
|
||||
Range: 2 * time.Second,
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, "bar", "baz"),
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 5 * time.Minute,
|
||||
Offset: 4 * time.Minute,
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 4 * time.Minute,
|
||||
Step: 3 * time.Second,
|
||||
},
|
||||
}, {
|
||||
input: "sum without(and, by, avg, count, alert, annotations)(some_metric) [30m:10s]",
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &AggregateExpr{
|
||||
Op: itemSum,
|
||||
Without: true,
|
||||
Expr: &VectorSelector{
|
||||
Name: "some_metric",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "some_metric"),
|
||||
},
|
||||
},
|
||||
Grouping: []string{"and", "by", "avg", "count", "alert", "annotations"},
|
||||
},
|
||||
Range: 30 * time.Minute,
|
||||
Step: 10 * time.Second,
|
||||
},
|
||||
}, {
|
||||
input: `some_metric OFFSET 1m [10m:5s]`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &VectorSelector{
|
||||
Name: "some_metric",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "some_metric"),
|
||||
},
|
||||
Offset: 1 * time.Minute,
|
||||
},
|
||||
Range: 10 * time.Minute,
|
||||
Step: 5 * time.Second,
|
||||
},
|
||||
}, {
|
||||
input: `(foo + bar{nm="val"})[5m:]`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &ParenExpr{
|
||||
Expr: &BinaryExpr{
|
||||
Op: itemADD,
|
||||
VectorMatching: &VectorMatching{
|
||||
Card: CardOneToOne,
|
||||
},
|
||||
LHS: &VectorSelector{
|
||||
Name: "foo",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
RHS: &VectorSelector{
|
||||
Name: "bar",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, "nm", "val"),
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 5 * time.Minute,
|
||||
},
|
||||
}, {
|
||||
input: `(foo + bar{nm="val"})[5m:] offset 10m`,
|
||||
expected: &SubqueryExpr{
|
||||
Expr: &ParenExpr{
|
||||
Expr: &BinaryExpr{
|
||||
Op: itemADD,
|
||||
VectorMatching: &VectorMatching{
|
||||
Card: CardOneToOne,
|
||||
},
|
||||
LHS: &VectorSelector{
|
||||
Name: "foo",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "foo"),
|
||||
},
|
||||
},
|
||||
RHS: &VectorSelector{
|
||||
Name: "bar",
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
mustLabelMatcher(labels.MatchEqual, "nm", "val"),
|
||||
mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Range: 5 * time.Minute,
|
||||
Offset: 10 * time.Minute,
|
||||
},
|
||||
}, {
|
||||
input: "test[5d] OFFSET 10s [10m:5s]",
|
||||
fail: true,
|
||||
errMsg: "parse error at char 29: subquery is only allowed on instant vector, got matrix in \"test[5d] offset 10s[10m:5s]\"",
|
||||
}, {
|
||||
input: `(foo + bar{nm="val"})[5m:][10m:5s]`,
|
||||
fail: true,
|
||||
errMsg: "parse error at char 27: could not parse remaining input \"[10m:5s]\"...",
|
||||
},
|
||||
}
|
||||
|
||||
func TestParseExpressions(t *testing.T) {
|
||||
|
|
|
@ -62,6 +62,9 @@ func tree(node Node, level string) string {
|
|||
case *UnaryExpr:
|
||||
t += tree(n.Expr, level)
|
||||
|
||||
case *SubqueryExpr:
|
||||
t += tree(n.Expr, level)
|
||||
|
||||
case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector:
|
||||
// nothing to do
|
||||
|
||||
|
@ -149,6 +152,14 @@ func (node *MatrixSelector) String() string {
|
|||
return fmt.Sprintf("%s[%s]%s", vecSelector.String(), model.Duration(node.Range), offset)
|
||||
}
|
||||
|
||||
func (node *SubqueryExpr) String() string {
|
||||
step := ""
|
||||
if node.Step != 0 {
|
||||
step = fmt.Sprintf("%s", model.Duration(node.Step))
|
||||
}
|
||||
return fmt.Sprintf("%s[%s:%s]", node.Expr.String(), model.Duration(node.Range), step)
|
||||
}
|
||||
|
||||
func (node *NumberLiteral) String() string {
|
||||
return fmt.Sprint(node.Val)
|
||||
}
|
||||
|
|
|
@ -515,7 +515,7 @@ func (t *Test) clear() {
|
|||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxConcurrent: 20,
|
||||
MaxSamples: 1000,
|
||||
MaxSamples: 10000,
|
||||
Timeout: 100 * time.Second,
|
||||
}
|
||||
|
||||
|
|
113
promql/testdata/subquery.test
vendored
Normal file
113
promql/testdata/subquery.test
vendored
Normal file
|
@ -0,0 +1,113 @@
|
|||
load 10s
|
||||
metric 1 2
|
||||
|
||||
# Evaluation before 0s gets no sample.
|
||||
eval instant at 10s sum_over_time(metric[50s:10s])
|
||||
{} 3
|
||||
|
||||
eval instant at 10s sum_over_time(metric[50s:5s])
|
||||
{} 4
|
||||
|
||||
# Every evaluation yields the last value, i.e. 2
|
||||
eval instant at 5m sum_over_time(metric[50s:10s])
|
||||
{} 12
|
||||
|
||||
# Series becomes stale at 5m10s (5m after last sample)
|
||||
# Hence subquery gets a single sample at 6m-50s=5m10s.
|
||||
eval instant at 6m sum_over_time(metric[50s:10s])
|
||||
{} 2
|
||||
|
||||
eval instant at 10s rate(metric[20s:10s])
|
||||
{} 0.1
|
||||
|
||||
eval instant at 20s rate(metric[20s:5s])
|
||||
{} 0.05
|
||||
|
||||
clear
|
||||
|
||||
load 10s
|
||||
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000
|
||||
http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000
|
||||
http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000
|
||||
http_requests{job="api-server", instance="1", group="canary"} 0+40x2000
|
||||
|
||||
eval instant at 8000s rate(http_requests{group=~"pro.*"}[1m:10s])
|
||||
{job="api-server", instance="0", group="production"} 1
|
||||
{job="api-server", instance="1", group="production"} 2
|
||||
|
||||
eval instant at 20000s avg_over_time(rate(http_requests[1m])[1m:1s])
|
||||
{job="api-server", instance="0", group="canary"} 8
|
||||
{job="api-server", instance="1", group="canary"} 4
|
||||
{job="api-server", instance="1", group="production"} 3
|
||||
{job="api-server", instance="0", group="production"} 3
|
||||
|
||||
clear
|
||||
|
||||
load 10s
|
||||
metric1 0+1x1000
|
||||
metric2 0+2x1000
|
||||
metric3 0+3x1000
|
||||
|
||||
eval instant at 1000s sum_over_time(metric1[30s:10s])
|
||||
{} 394
|
||||
|
||||
# This is (394*2 - 100), because other than the last 100 at 1000s,
|
||||
# everything else is repeated with the 5s step.
|
||||
eval instant at 1000s sum_over_time(metric1[30s:5s])
|
||||
{} 688
|
||||
|
||||
# Offset is aligned with the step.
|
||||
eval instant at 1010s sum_over_time(metric1[30s:10s] offset 10s)
|
||||
{} 394
|
||||
|
||||
# Same result for different offsets due to step alignment.
|
||||
eval instant at 1010s sum_over_time(metric1[30s:10s] offset 9s)
|
||||
{} 297
|
||||
|
||||
eval instant at 1010s sum_over_time(metric1[30s:10s] offset 7s)
|
||||
{} 297
|
||||
|
||||
eval instant at 1010s sum_over_time(metric1[30s:10s] offset 5s)
|
||||
{} 297
|
||||
|
||||
eval instant at 1010s sum_over_time(metric1[30s:10s] offset 3s)
|
||||
{} 297
|
||||
|
||||
# Nested subqueries
|
||||
eval instant at 1000s rate(sum_over_time(metric1[30s:10s])[50s:10s])
|
||||
{} 0.4
|
||||
|
||||
eval instant at 1000s rate(sum_over_time(metric2[30s:10s])[50s:10s])
|
||||
{} 0.8
|
||||
|
||||
eval instant at 1000s rate(sum_over_time(metric3[30s:10s])[50s:10s])
|
||||
{} 1.2
|
||||
|
||||
eval instant at 1000s rate(sum_over_time((metric1+metric2+metric3)[30s:10s])[30s:10s])
|
||||
{} 2.4
|
||||
|
||||
clear
|
||||
|
||||
# Fibonacci sequence, to ensure the rate is not constant.
|
||||
# Additional note: using subqueries unnecessarily is unwise.
|
||||
load 7s
|
||||
metric 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765 10946 17711 28657 46368 75025 121393 196418 317811 514229 832040 1346269 2178309 3524578 5702887 9227465 14930352 24157817 39088169 63245986 102334155 165580141 267914296 433494437 701408733 1134903170 1836311903 2971215073 4807526976 7778742049 12586269025 20365011074 32951280099 53316291173 86267571272 139583862445 225851433717 365435296162 591286729879 956722026041 1548008755920 2504730781961 4052739537881 6557470319842 10610209857723 17167680177565 27777890035288 44945570212853 72723460248141 117669030460994 190392490709135 308061521170129 498454011879264 806515533049393 1304969544928657 2111485077978050 3416454622906707 5527939700884757 8944394323791464 14472334024676221 23416728348467685 37889062373143906 61305790721611591 99194853094755497 160500643816367088 259695496911122585 420196140727489673 679891637638612258 1100087778366101931 1779979416004714189 2880067194370816120 4660046610375530309 7540113804746346429 12200160415121876738 19740274219868223167 31940434634990099905 51680708854858323072 83621143489848422977 135301852344706746049 218922995834555169026 354224848179261915075 573147844013817084101 927372692193078999176 1500520536206896083277 2427893228399975082453 3928413764606871165730 6356306993006846248183 10284720757613717413913 16641027750620563662096 26925748508234281076009 43566776258854844738105 70492524767089125814114 114059301025943970552219 184551825793033096366333 298611126818977066918552 483162952612010163284885 781774079430987230203437 1264937032042997393488322 2046711111473984623691759 3311648143516982017180081 5358359254990966640871840 8670007398507948658051921 14028366653498915298923761 22698374052006863956975682 36726740705505779255899443 59425114757512643212875125 96151855463018422468774568 155576970220531065681649693 251728825683549488150424261 407305795904080553832073954 659034621587630041982498215 1066340417491710595814572169 1725375039079340637797070384 2791715456571051233611642553 4517090495650391871408712937 7308805952221443105020355490 11825896447871834976429068427 19134702400093278081449423917 30960598847965113057878492344 50095301248058391139327916261 81055900096023504197206408605 131151201344081895336534324866 212207101440105399533740733471 343358302784187294870275058337 555565404224292694404015791808 898923707008479989274290850145 1454489111232772683678306641953 2353412818241252672952597492098 3807901929474025356630904134051 6161314747715278029583501626149 9969216677189303386214405760200 16130531424904581415797907386349 26099748102093884802012313146549 42230279526998466217810220532898 68330027629092351019822533679447 110560307156090817237632754212345 178890334785183168257455287891792 289450641941273985495088042104137 468340976726457153752543329995929 757791618667731139247631372100066 1226132595394188293000174702095995 1983924214061919432247806074196061 3210056809456107725247980776292056 5193981023518027157495786850488117 8404037832974134882743767626780173 13598018856492162040239554477268290 22002056689466296922983322104048463 35600075545958458963222876581316753 57602132235424755886206198685365216 93202207781383214849429075266681969 150804340016807970735635273952047185 244006547798191185585064349218729154 394810887814999156320699623170776339 638817435613190341905763972389505493 1033628323428189498226463595560281832 1672445759041379840132227567949787325 2706074082469569338358691163510069157 4378519841510949178490918731459856482 7084593923980518516849609894969925639 11463113765491467695340528626429782121 18547707689471986212190138521399707760
|
||||
|
||||
# Extrapolated from [3@21, 144@77]: (144 - 3) / (77 - 21)
|
||||
eval instant at 80s rate(metric[1m])
|
||||
{} 2.517857143
|
||||
|
||||
# No extrapolation, [2@20, 144@80]: (144 - 2) / 60
|
||||
eval instant at 80s rate(metric[1m:10s])
|
||||
{} 2.366666667
|
||||
|
||||
# Only one value between 10s and 20s, 2@14
|
||||
eval instant at 20s min_over_time(metric[10s])
|
||||
{} 2
|
||||
|
||||
# min(1@10, 2@20)
|
||||
eval instant at 20s min_over_time(metric[10s:10s])
|
||||
{} 1
|
||||
|
||||
eval instant at 20m min_over_time(rate(metric[5m])[20m:1m])
|
||||
{} 0.12119047619047618
|
|
@ -255,3 +255,68 @@ func (r *Result) String() string {
|
|||
}
|
||||
return r.Value.String()
|
||||
}
|
||||
|
||||
// StorageSeries simulates promql.Series as storage.Series.
|
||||
type StorageSeries struct {
|
||||
series Series
|
||||
}
|
||||
|
||||
// NewStorageSeries returns a StorageSeries fromfor series.
|
||||
func NewStorageSeries(series Series) *StorageSeries {
|
||||
return &StorageSeries{
|
||||
series: series,
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *StorageSeries) Labels() labels.Labels {
|
||||
return ss.series.Metric
|
||||
}
|
||||
|
||||
// Iterator returns a new iterator of the data of the series.
|
||||
func (ss *StorageSeries) Iterator() storage.SeriesIterator {
|
||||
return newStorageSeriesIterator(ss.series)
|
||||
}
|
||||
|
||||
type storageSeriesIterator struct {
|
||||
points []Point
|
||||
curr int
|
||||
}
|
||||
|
||||
func newStorageSeriesIterator(series Series) *storageSeriesIterator {
|
||||
return &storageSeriesIterator{
|
||||
points: series.Points,
|
||||
curr: -1,
|
||||
}
|
||||
}
|
||||
|
||||
func (ssi *storageSeriesIterator) Seek(t int64) bool {
|
||||
i := ssi.curr
|
||||
if i < 0 {
|
||||
i = 0
|
||||
}
|
||||
for ; i < len(ssi.points); i++ {
|
||||
if ssi.points[i].T >= t {
|
||||
ssi.curr = i
|
||||
return true
|
||||
}
|
||||
}
|
||||
ssi.curr = len(ssi.points) - 1
|
||||
return false
|
||||
}
|
||||
|
||||
func (ssi *storageSeriesIterator) At() (t int64, v float64) {
|
||||
p := ssi.points[ssi.curr]
|
||||
return p.T, p.V
|
||||
}
|
||||
|
||||
func (ssi *storageSeriesIterator) Next() bool {
|
||||
ssi.curr++
|
||||
if ssi.curr >= len(ssi.points) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (ssi *storageSeriesIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
|
233
rules/manager.go
233
rules/manager.go
|
@ -53,55 +53,6 @@ const (
|
|||
const namespace = "prometheus"
|
||||
|
||||
var (
|
||||
evalDuration = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_evaluation_duration_seconds",
|
||||
Help: "The duration for a rule to execute.",
|
||||
},
|
||||
)
|
||||
evalFailures = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_evaluation_failures_total",
|
||||
Help: "The total number of rule evaluation failures.",
|
||||
},
|
||||
)
|
||||
evalTotal = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_evaluations_total",
|
||||
Help: "The total number of rule evaluations.",
|
||||
},
|
||||
)
|
||||
iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_duration_seconds",
|
||||
Help: "The duration of rule group evaluations.",
|
||||
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
||||
})
|
||||
iterationsMissed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_iterations_missed_total",
|
||||
Help: "The total number of rule group evaluations missed due to slow rule group evaluation.",
|
||||
})
|
||||
iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_iterations_total",
|
||||
Help: "The total number of scheduled rule group evaluations, whether executed or missed.",
|
||||
})
|
||||
lastEvaluation = prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, "", "rule_group_last_evaluation_timestamp_seconds"),
|
||||
"The timestamp of the last rule group evaluation in seconds.",
|
||||
[]string{"rule_group"},
|
||||
nil,
|
||||
)
|
||||
lastDuration = prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, "", "rule_group_last_duration_seconds"),
|
||||
"The duration of the last rule group evaluation.",
|
||||
[]string{"rule_group"},
|
||||
nil,
|
||||
)
|
||||
groupInterval = prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, "", "rule_group_interval_seconds"),
|
||||
"The interval of a rule group.",
|
||||
|
@ -110,12 +61,88 @@ var (
|
|||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(iterationDuration)
|
||||
prometheus.MustRegister(iterationsScheduled)
|
||||
prometheus.MustRegister(iterationsMissed)
|
||||
prometheus.MustRegister(evalFailures)
|
||||
prometheus.MustRegister(evalDuration)
|
||||
// Metrics for rule evaluation.
|
||||
type Metrics struct {
|
||||
evalDuration prometheus.Summary
|
||||
evalFailures prometheus.Counter
|
||||
evalTotal prometheus.Counter
|
||||
iterationDuration prometheus.Summary
|
||||
iterationsMissed prometheus.Counter
|
||||
iterationsScheduled prometheus.Counter
|
||||
groupLastEvalTime *prometheus.GaugeVec
|
||||
groupLastDuration *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
// NewGroupMetrics makes a new Metrics and registers them with then provided registerer,
|
||||
// if not nil.
|
||||
func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
|
||||
m := &Metrics{
|
||||
evalDuration: prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_evaluation_duration_seconds",
|
||||
Help: "The duration for a rule to execute.",
|
||||
}),
|
||||
evalFailures: prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_evaluation_failures_total",
|
||||
Help: "The total number of rule evaluation failures.",
|
||||
}),
|
||||
evalTotal: prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_evaluations_total",
|
||||
Help: "The total number of rule evaluations.",
|
||||
}),
|
||||
iterationDuration: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_duration_seconds",
|
||||
Help: "The duration of rule group evaluations.",
|
||||
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
||||
}),
|
||||
iterationsMissed: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_iterations_missed_total",
|
||||
Help: "The total number of rule group evaluations missed due to slow rule group evaluation.",
|
||||
}),
|
||||
iterationsScheduled: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_iterations_total",
|
||||
Help: "The total number of scheduled rule group evaluations, whether executed or missed.",
|
||||
}),
|
||||
groupLastEvalTime: prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_last_evaluation_timestamp_seconds",
|
||||
Help: "The timestamp of the last rule group evaluation in seconds.",
|
||||
},
|
||||
[]string{"rule_group"},
|
||||
),
|
||||
groupLastDuration: prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "rule_group_last_duration_seconds",
|
||||
Help: "The duration of the last rule group evaluation.",
|
||||
},
|
||||
[]string{"rule_group"},
|
||||
),
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
m.evalDuration,
|
||||
m.evalFailures,
|
||||
m.evalTotal,
|
||||
m.iterationDuration,
|
||||
m.iterationsMissed,
|
||||
m.iterationsScheduled,
|
||||
m.groupLastEvalTime,
|
||||
m.groupLastDuration,
|
||||
)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// QueryFunc processes PromQL queries.
|
||||
|
@ -165,8 +192,12 @@ type Rule interface {
|
|||
// Health returns the current health of the rule.
|
||||
Health() RuleHealth
|
||||
SetEvaluationDuration(time.Duration)
|
||||
// GetEvaluationDuration returns last evaluation duration.
|
||||
// NOTE: Used dynamically by rules.html template.
|
||||
GetEvaluationDuration() time.Duration
|
||||
SetEvaluationTimestamp(time.Time)
|
||||
// GetEvaluationTimestamp returns last evaluation timestamp.
|
||||
// NOTE: Used dynamically by rules.html template.
|
||||
GetEvaluationTimestamp() time.Time
|
||||
// HTMLSnippet returns a human-readable string representation of the rule,
|
||||
// decorated with HTML elements for use the web frontend.
|
||||
|
@ -191,10 +222,20 @@ type Group struct {
|
|||
terminated chan struct{}
|
||||
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// NewGroup makes a new Group with the given name, options, and rules.
|
||||
func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRestore bool, opts *ManagerOptions) *Group {
|
||||
metrics := opts.Metrics
|
||||
if metrics == nil {
|
||||
metrics = NewGroupMetrics(opts.Registerer)
|
||||
}
|
||||
|
||||
metrics.groupLastEvalTime.WithLabelValues(groupKey(file, name))
|
||||
metrics.groupLastDuration.WithLabelValues(groupKey(file, name))
|
||||
|
||||
return &Group{
|
||||
name: name,
|
||||
file: file,
|
||||
|
@ -206,6 +247,7 @@ func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRes
|
|||
done: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
logger: log.With(opts.Logger, "group", name),
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,15 +275,15 @@ func (g *Group) run(ctx context.Context) {
|
|||
}
|
||||
|
||||
iter := func() {
|
||||
iterationsScheduled.Inc()
|
||||
g.metrics.iterationsScheduled.Inc()
|
||||
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
iterationDuration.Observe(timeSinceStart.Seconds())
|
||||
g.SetEvaluationDuration(timeSinceStart)
|
||||
g.SetEvaluationTimestamp(start)
|
||||
g.metrics.iterationDuration.Observe(timeSinceStart.Seconds())
|
||||
g.setEvaluationDuration(timeSinceStart)
|
||||
g.setEvaluationTimestamp(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
|
@ -262,8 +304,8 @@ func (g *Group) run(ctx context.Context) {
|
|||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.interval) - 1
|
||||
if missed > 0 {
|
||||
iterationsMissed.Add(float64(missed))
|
||||
iterationsScheduled.Add(float64(missed))
|
||||
g.metrics.iterationsMissed.Add(float64(missed))
|
||||
g.metrics.iterationsScheduled.Add(float64(missed))
|
||||
}
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
||||
iter()
|
||||
|
@ -284,8 +326,8 @@ func (g *Group) run(ctx context.Context) {
|
|||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.interval) - 1
|
||||
if missed > 0 {
|
||||
iterationsMissed.Add(float64(missed))
|
||||
iterationsScheduled.Add(float64(missed))
|
||||
g.metrics.iterationsMissed.Add(float64(missed))
|
||||
g.metrics.iterationsScheduled.Add(float64(missed))
|
||||
}
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
|
||||
iter()
|
||||
|
@ -314,20 +356,15 @@ func (g *Group) GetEvaluationDuration() time.Duration {
|
|||
return g.evaluationDuration
|
||||
}
|
||||
|
||||
// SetEvaluationDuration sets the time in seconds the last evaluation took.
|
||||
func (g *Group) SetEvaluationDuration(dur time.Duration) {
|
||||
// setEvaluationDuration sets the time in seconds the last evaluation took.
|
||||
func (g *Group) setEvaluationDuration(dur time.Duration) {
|
||||
g.metrics.groupLastDuration.WithLabelValues(groupKey(g.file, g.name)).Set(float64(dur))
|
||||
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.evaluationDuration = dur
|
||||
}
|
||||
|
||||
// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
|
||||
func (g *Group) SetEvaluationTimestamp(ts time.Time) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.evaluationTimestamp = ts
|
||||
}
|
||||
|
||||
// GetEvaluationTimestamp returns the time the last evaluation of the rule group took place.
|
||||
func (g *Group) GetEvaluationTimestamp() time.Time {
|
||||
g.mtx.Lock()
|
||||
|
@ -335,6 +372,15 @@ func (g *Group) GetEvaluationTimestamp() time.Time {
|
|||
return g.evaluationTimestamp
|
||||
}
|
||||
|
||||
// setEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
|
||||
func (g *Group) setEvaluationTimestamp(ts time.Time) {
|
||||
g.metrics.groupLastEvalTime.WithLabelValues(groupKey(g.file, g.name)).Set(float64(ts.Second()))
|
||||
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.evaluationTimestamp = ts
|
||||
}
|
||||
|
||||
// evalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
||||
func (g *Group) evalTimestamp() time.Time {
|
||||
var (
|
||||
|
@ -399,12 +445,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
sp.SetTag("name", rule.Name())
|
||||
defer func(t time.Time) {
|
||||
sp.Finish()
|
||||
evalDuration.Observe(time.Since(t).Seconds())
|
||||
rule.SetEvaluationDuration(time.Since(t))
|
||||
|
||||
since := time.Since(t)
|
||||
g.metrics.evalDuration.Observe(since.Seconds())
|
||||
rule.SetEvaluationDuration(since)
|
||||
rule.SetEvaluationTimestamp(t)
|
||||
}(time.Now())
|
||||
|
||||
evalTotal.Inc()
|
||||
g.metrics.evalTotal.Inc()
|
||||
|
||||
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
|
||||
if err != nil {
|
||||
|
@ -413,7 +461,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
if _, ok := err.(promql.ErrQueryCanceled); !ok {
|
||||
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
|
||||
}
|
||||
evalFailures.Inc()
|
||||
g.metrics.evalFailures.Inc()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -490,7 +538,11 @@ func (g *Group) RestoreForState(ts time.Time) {
|
|||
level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err)
|
||||
return
|
||||
}
|
||||
defer q.Close()
|
||||
defer func() {
|
||||
if err := q.Close(); err != nil {
|
||||
level.Error(g.logger).Log("msg", "Failed to close Querier", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, rule := range g.Rules() {
|
||||
alertRule, ok := rule.(*AlertingRule)
|
||||
|
@ -634,20 +686,29 @@ type ManagerOptions struct {
|
|||
OutageTolerance time.Duration
|
||||
ForGracePeriod time.Duration
|
||||
ResendDelay time.Duration
|
||||
|
||||
Metrics *Metrics
|
||||
}
|
||||
|
||||
// NewManager returns an implementation of Manager, ready to be started
|
||||
// by calling the Run method.
|
||||
func NewManager(o *ManagerOptions) *Manager {
|
||||
if o.Metrics == nil {
|
||||
o.Metrics = NewGroupMetrics(o.Registerer)
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
groups: map[string]*Group{},
|
||||
opts: o,
|
||||
block: make(chan struct{}),
|
||||
logger: o.Logger,
|
||||
}
|
||||
|
||||
if o.Registerer != nil {
|
||||
o.Registerer.MustRegister(m)
|
||||
}
|
||||
|
||||
o.Metrics.iterationsMissed.Inc()
|
||||
return m
|
||||
}
|
||||
|
||||
|
@ -825,29 +886,11 @@ func (m *Manager) AlertingRules() []*AlertingRule {
|
|||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (m *Manager) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- lastEvaluation
|
||||
ch <- lastDuration
|
||||
ch <- groupInterval
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (m *Manager) Collect(ch chan<- prometheus.Metric) {
|
||||
for _, g := range m.RuleGroups() {
|
||||
lastEvaluationTime := g.GetEvaluationTimestamp()
|
||||
lastEvaluationTimestamp := math.Inf(-1)
|
||||
if !lastEvaluationTime.IsZero() {
|
||||
lastEvaluationTimestamp = float64(lastEvaluationTime.UnixNano()) / 1e9
|
||||
}
|
||||
key := groupKey(g.file, g.name)
|
||||
ch <- prometheus.MustNewConstMetric(lastEvaluation,
|
||||
prometheus.GaugeValue,
|
||||
lastEvaluationTimestamp,
|
||||
key)
|
||||
ch <- prometheus.MustNewConstMetric(lastDuration,
|
||||
prometheus.GaugeValue,
|
||||
g.GetEvaluationDuration().Seconds(),
|
||||
key)
|
||||
}
|
||||
for _, g := range m.RuleGroups() {
|
||||
ch <- prometheus.MustNewConstMetric(groupInterval,
|
||||
prometheus.GaugeValue,
|
||||
|
|
|
@ -538,7 +538,7 @@ func TestStaleness(t *testing.T) {
|
|||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
set, err, _ := querier.Select(nil, matcher)
|
||||
set, _, err := querier.Select(nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
samples, err := readSeriesSet(set)
|
||||
|
|
|
@ -228,11 +228,11 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
|
|||
}
|
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error, Warnings) {
|
||||
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
||||
var warnings Warnings
|
||||
for _, querier := range q.queriers {
|
||||
set, err, wrn := querier.Select(params, matchers...)
|
||||
set, wrn, err := querier.Select(params, matchers...)
|
||||
q.setQuerierMap[set] = querier
|
||||
if wrn != nil {
|
||||
warnings = append(warnings, wrn...)
|
||||
|
@ -244,12 +244,12 @@ func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher)
|
|||
warnings = append(warnings, err)
|
||||
continue
|
||||
} else {
|
||||
return nil, err, nil
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
seriesSets = append(seriesSets, set)
|
||||
}
|
||||
return NewMergeSeriesSet(seriesSets, q), nil, warnings
|
||||
return NewMergeSeriesSet(seriesSets, q), warnings, nil
|
||||
}
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
|
|
|
@ -52,7 +52,7 @@ type Queryable interface {
|
|||
// Querier provides reading access to time series data.
|
||||
type Querier interface {
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings)
|
||||
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
LabelValues(name string) ([]string, error)
|
||||
|
|
|
@ -26,7 +26,7 @@ func NoopQuerier() Querier {
|
|||
return noopQuerier{}
|
||||
}
|
||||
|
||||
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings) {
|
||||
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
return NoopSeriesSet(), nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -59,10 +59,10 @@ type querier struct {
|
|||
|
||||
// Select implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
|
||||
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
query, err := ToQuery(q.mint, q.maxt, matchers, p)
|
||||
if err != nil {
|
||||
return nil, err, nil
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name())
|
||||
|
@ -71,7 +71,7 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (
|
|||
|
||||
res, err := q.client.Read(q.ctx, query)
|
||||
if err != nil {
|
||||
return nil, err, nil
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return FromQueryResult(res), nil, nil
|
||||
|
@ -117,13 +117,13 @@ type externalLabelsQuerier struct {
|
|||
// Select adds equality matchers for all external labels to the list of matchers
|
||||
// before calling the wrapped storage.Queryable. The added external labels are
|
||||
// removed from the returned series sets.
|
||||
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
|
||||
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
m, added := q.addExternalLabels(matchers)
|
||||
s, err, warnings := q.Querier.Select(p, m...)
|
||||
s, warnings, err := q.Querier.Select(p, m...)
|
||||
if err != nil {
|
||||
return nil, err, warnings
|
||||
return nil, warnings, err
|
||||
}
|
||||
return newSeriesSetFilter(s, added), nil, warnings
|
||||
return newSeriesSetFilter(s, added), warnings, nil
|
||||
}
|
||||
|
||||
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
|
||||
|
@ -170,7 +170,7 @@ type requiredMatchersQuerier struct {
|
|||
|
||||
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
||||
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
||||
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
|
||||
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
ms := q.requiredMatchers
|
||||
for _, m := range matchers {
|
||||
for i, r := range ms {
|
||||
|
|
|
@ -42,7 +42,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
|
|||
externalLabels: model.LabelSet{"region": "europe"},
|
||||
}
|
||||
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
|
||||
have, err, _ := q.Select(nil, matchers...)
|
||||
have, _, err := q.Select(nil, matchers...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ type mockSeriesSet struct {
|
|||
storage.SeriesSet
|
||||
}
|
||||
|
||||
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
|
||||
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return mockSeriesSet{}, nil, nil
|
||||
}
|
||||
|
||||
|
@ -313,7 +313,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
|
|||
requiredMatchers: test.requiredMatchers,
|
||||
}
|
||||
|
||||
have, err, _ := q.Select(nil, test.matchers...)
|
||||
have, _, err := q.Select(nil, test.matchers...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// Callback func that return the oldest timestamp stored in a storage.
|
||||
// startTimeCallback is a callback func that return the oldest timestamp stored in a storage.
|
||||
type startTimeCallback func() (int64, error)
|
||||
|
||||
// Storage represents all the remote read and write endpoints. It implements
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/alecthomas/units"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -117,6 +118,9 @@ type Options struct {
|
|||
// The maximum timestamp range of compacted blocks.
|
||||
MaxBlockDuration model.Duration
|
||||
|
||||
// The maximum size of each WAL segment file.
|
||||
WALSegmentSize units.Base2Bytes
|
||||
|
||||
// Duration for how long to retain data.
|
||||
Retention model.Duration
|
||||
|
||||
|
@ -182,6 +186,7 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
|
|||
|
||||
db, err := tsdb.Open(path, l, r, &tsdb.Options{
|
||||
WALFlushInterval: 10 * time.Second,
|
||||
WALSegmentSize: int(opts.WALSegmentSize),
|
||||
RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000),
|
||||
BlockRanges: rngs,
|
||||
NoLockfile: opts.NoLockfile,
|
||||
|
@ -230,7 +235,7 @@ type querier struct {
|
|||
q tsdb.Querier
|
||||
}
|
||||
|
||||
func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
|
||||
func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
ms := make([]tsdbLabels.Matcher, 0, len(oms))
|
||||
|
||||
for _, om := range oms {
|
||||
|
@ -238,7 +243,7 @@ func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storag
|
|||
}
|
||||
set, err := q.q.Select(ms...)
|
||||
if err != nil {
|
||||
return nil, err, nil
|
||||
return nil, nil, err
|
||||
}
|
||||
return seriesSet{set: set}, nil, nil
|
||||
}
|
||||
|
|
3
vendor/github.com/prometheus/tsdb/CHANGELOG.md
generated
vendored
3
vendor/github.com/prometheus/tsdb/CHANGELOG.md
generated
vendored
|
@ -1,11 +1,10 @@
|
|||
## master / unreleased
|
||||
|
||||
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
|
||||
|
||||
## 0.3.1
|
||||
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
|
||||
|
||||
## 0.3.0
|
||||
|
||||
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
|
||||
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
|
||||
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
|
||||
|
|
10
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
10
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -45,6 +45,7 @@ import (
|
|||
// millisecond precision timestamps.
|
||||
var DefaultOptions = &Options{
|
||||
WALFlushInterval: 5 * time.Second,
|
||||
WALSegmentSize: wal.DefaultSegmentSize,
|
||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
|
||||
NoLockfile: false,
|
||||
|
@ -55,6 +56,9 @@ type Options struct {
|
|||
// The interval at which the write ahead log is flushed to disk.
|
||||
WALFlushInterval time.Duration
|
||||
|
||||
// Segments (wal files) max size
|
||||
WALSegmentSize int
|
||||
|
||||
// Duration of persisted data to keep.
|
||||
RetentionDuration uint64
|
||||
|
||||
|
@ -263,7 +267,11 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
return nil, errors.Wrap(err, "create leveled compactor")
|
||||
}
|
||||
|
||||
wlog, err := wal.New(l, r, filepath.Join(dir, "wal"))
|
||||
segmentSize := wal.DefaultSegmentSize
|
||||
if opts.WALSegmentSize > 0 {
|
||||
segmentSize = opts.WALSegmentSize
|
||||
}
|
||||
wlog, err := wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
12
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
12
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -89,6 +89,7 @@ type headMetrics struct {
|
|||
maxTime prometheus.GaugeFunc
|
||||
samplesAppended prometheus.Counter
|
||||
walTruncateDuration prometheus.Summary
|
||||
walCorruptionsTotal prometheus.Counter
|
||||
headTruncateFail prometheus.Counter
|
||||
headTruncateTotal prometheus.Counter
|
||||
checkpointDeleteFail prometheus.Counter
|
||||
|
@ -152,6 +153,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
|
||||
Help: "Duration of WAL truncation.",
|
||||
})
|
||||
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_wal_corruptions_total",
|
||||
Help: "Total number of WAL corruptions.",
|
||||
})
|
||||
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_head_samples_appended_total",
|
||||
Help: "Total number of appended samples.",
|
||||
|
@ -195,6 +200,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
m.maxTime,
|
||||
m.gcDuration,
|
||||
m.walTruncateDuration,
|
||||
m.walCorruptionsTotal,
|
||||
m.samplesAppended,
|
||||
m.headTruncateFail,
|
||||
m.headTruncateTotal,
|
||||
|
@ -480,10 +486,10 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
return nil
|
||||
}
|
||||
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
|
||||
h.metrics.walCorruptionsTotal.Inc()
|
||||
if err := h.wal.Repair(err); err != nil {
|
||||
return errors.Wrap(err, "repair corrupted WAL")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -500,7 +506,7 @@ func (h *Head) Truncate(mint int64) (err error) {
|
|||
return nil
|
||||
}
|
||||
atomic.StoreInt64(&h.minTime, mint)
|
||||
h.minValidTime = mint
|
||||
atomic.StoreInt64(&h.minValidTime, mint)
|
||||
|
||||
// Ensure that max time is at least as high as min time.
|
||||
for h.MaxTime() < mint {
|
||||
|
@ -656,7 +662,7 @@ func (h *Head) appender() *headAppender {
|
|||
head: h,
|
||||
// Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
|
||||
// This ensures that no samples will be added within the compaction window to avoid races.
|
||||
minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2),
|
||||
minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2),
|
||||
mint: math.MaxInt64,
|
||||
maxt: math.MinInt64,
|
||||
samples: h.getAppendBuffer(),
|
||||
|
|
4
vendor/github.com/prometheus/tsdb/wal/wal.go
generated
vendored
4
vendor/github.com/prometheus/tsdb/wal/wal.go
generated
vendored
|
@ -35,7 +35,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
|
||||
DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB
|
||||
pageSize = 32 * 1024 // 32KB
|
||||
recordHeaderSize = 7
|
||||
)
|
||||
|
@ -174,7 +174,7 @@ type WAL struct {
|
|||
|
||||
// New returns a new WAL over the given directory.
|
||||
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
|
||||
return NewSize(logger, reg, dir, defaultSegmentSize)
|
||||
return NewSize(logger, reg, dir, DefaultSegmentSize)
|
||||
}
|
||||
|
||||
// NewSize returns a new WAL over the given directory.
|
||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -187,7 +187,7 @@ github.com/prometheus/procfs
|
|||
github.com/prometheus/procfs/nfs
|
||||
github.com/prometheus/procfs/xfs
|
||||
github.com/prometheus/procfs/internal/util
|
||||
# github.com/prometheus/tsdb v0.3.1
|
||||
# github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc
|
||||
github.com/prometheus/tsdb
|
||||
github.com/prometheus/tsdb/labels
|
||||
github.com/prometheus/tsdb/chunkenc
|
||||
|
|
|
@ -488,7 +488,7 @@ func (api *API) series(r *http.Request) apiFuncResult {
|
|||
var sets []storage.SeriesSet
|
||||
var warnings storage.Warnings
|
||||
for _, mset := range matcherSets {
|
||||
s, err, wrn := q.Select(nil, mset...) //TODO
|
||||
s, wrn, err := q.Select(nil, mset...) //TODO
|
||||
warnings = append(warnings, wrn...)
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
|
||||
|
@ -883,7 +883,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
set, err, _ := querier.Select(selectParams, filteredMatchers...)
|
||||
set, _, err := querier.Select(selectParams, filteredMatchers...)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
|
|
@ -379,7 +379,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
|
|||
}
|
||||
defer querier.Close()
|
||||
|
||||
set, err, _ := querier.Select(selectParams, matchers...)
|
||||
set, _, err := querier.Select(selectParams, matchers...)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
|
|
@ -87,7 +87,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
|
||||
var sets []storage.SeriesSet
|
||||
for _, mset := range matcherSets {
|
||||
s, err, wrns := q.Select(params, mset...)
|
||||
s, wrns, err := q.Select(params, mset...)
|
||||
if wrns != nil {
|
||||
level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns)
|
||||
federationErrors.Add(float64(len(wrns)))
|
||||
|
|
Loading…
Reference in a new issue