Refactor query evaluation.

This copies the evaluation logic from the current rules/ package.
The new engine handles the execution process from query string to final result.
It provides query timeout and cancellation and general flexibility for
future changes.

functions.go: Add evaluation implementation. Slight changes to in/out data but
	not to the processing logic.
quantile.go: No changes.
analyzer.go: No changes.
engine.go: Actually new part. Mainly consists of evaluation methods
	which were not changed.
setup_test.go: Copy of rules/helpers_test.go to setup test storage.
promql_test.go: Copy of rules/rules_test.go.
This commit is contained in:
Fabian Reinartz 2015-03-30 19:13:36 +02:00
parent 089f019660
commit 5602328c7c
15 changed files with 5561 additions and 34 deletions

4
Godeps/Godeps.json generated
View file

@ -7,6 +7,10 @@
"Comment": "null-5", "Comment": "null-5",
"Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675" "Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675"
}, },
{
"ImportPath": "github.com/prometheus/procfs",
"Rev": "92faa308558161acab0ada1db048e9996ecec160"
},
{ {
"ImportPath": "github.com/beorn7/perks/quantile", "ImportPath": "github.com/beorn7/perks/quantile",
"Rev": "b965b613227fddccbfffe13eae360ed3fa822f8d" "Rev": "b965b613227fddccbfffe13eae360ed3fa822f8d"

View file

@ -0,0 +1,447 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package context defines the Context type, which carries deadlines,
// cancelation signals, and other request-scoped values across API boundaries
// and between processes.
//
// Incoming requests to a server should create a Context, and outgoing calls to
// servers should accept a Context. The chain of function calls between must
// propagate the Context, optionally replacing it with a modified copy created
// using WithDeadline, WithTimeout, WithCancel, or WithValue.
//
// Programs that use Contexts should follow these rules to keep interfaces
// consistent across packages and enable static analysis tools to check context
// propagation:
//
// Do not store Contexts inside a struct type; instead, pass a Context
// explicitly to each function that needs it. The Context should be the first
// parameter, typically named ctx:
//
// func DoSomething(ctx context.Context, arg Arg) error {
// // ... use ctx ...
// }
//
// Do not pass a nil Context, even if a function permits it. Pass context.TODO
// if you are unsure about which Context to use.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The same Context may be passed to functions running in different goroutines;
// Contexts are safe for simultaneous use by multiple goroutines.
//
// See http://blog.golang.org/context for example code for a server that uses
// Contexts.
package context
import (
"errors"
"fmt"
"sync"
"time"
)
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out <-chan Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = errors.New("context deadline exceeded")
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it's is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter). TODO is recognized by static analysis tools that determine
// whether Contexts are propagated correctly in a program.
func TODO() Context {
return todo
}
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
done chan struct{} // closed by the first cancel call.
mu sync.Mutex
children map[canceler]bool // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
return c.done
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
close(c.done)
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

View file

@ -0,0 +1,575 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package context
import (
"fmt"
"math/rand"
"runtime"
"strings"
"sync"
"testing"
"time"
)
// otherContext is a Context that's not one of the types defined in context.go.
// This lets us test code paths that differ based on the underlying type of the
// Context.
type otherContext struct {
Context
}
func TestBackground(t *testing.T) {
c := Background()
if c == nil {
t.Fatalf("Background returned nil")
}
select {
case x := <-c.Done():
t.Errorf("<-c.Done() == %v want nothing (it should block)", x)
default:
}
if got, want := fmt.Sprint(c), "context.Background"; got != want {
t.Errorf("Background().String() = %q want %q", got, want)
}
}
func TestTODO(t *testing.T) {
c := TODO()
if c == nil {
t.Fatalf("TODO returned nil")
}
select {
case x := <-c.Done():
t.Errorf("<-c.Done() == %v want nothing (it should block)", x)
default:
}
if got, want := fmt.Sprint(c), "context.TODO"; got != want {
t.Errorf("TODO().String() = %q want %q", got, want)
}
}
func TestWithCancel(t *testing.T) {
c1, cancel := WithCancel(Background())
if got, want := fmt.Sprint(c1), "context.Background.WithCancel"; got != want {
t.Errorf("c1.String() = %q want %q", got, want)
}
o := otherContext{c1}
c2, _ := WithCancel(o)
contexts := []Context{c1, o, c2}
for i, c := range contexts {
if d := c.Done(); d == nil {
t.Errorf("c[%d].Done() == %v want non-nil", i, d)
}
if e := c.Err(); e != nil {
t.Errorf("c[%d].Err() == %v want nil", i, e)
}
select {
case x := <-c.Done():
t.Errorf("<-c.Done() == %v want nothing (it should block)", x)
default:
}
}
cancel()
time.Sleep(100 * time.Millisecond) // let cancelation propagate
for i, c := range contexts {
select {
case <-c.Done():
default:
t.Errorf("<-c[%d].Done() blocked, but shouldn't have", i)
}
if e := c.Err(); e != Canceled {
t.Errorf("c[%d].Err() == %v want %v", i, e, Canceled)
}
}
}
func TestParentFinishesChild(t *testing.T) {
// Context tree:
// parent -> cancelChild
// parent -> valueChild -> timerChild
parent, cancel := WithCancel(Background())
cancelChild, stop := WithCancel(parent)
defer stop()
valueChild := WithValue(parent, "key", "value")
timerChild, stop := WithTimeout(valueChild, 10000*time.Hour)
defer stop()
select {
case x := <-parent.Done():
t.Errorf("<-parent.Done() == %v want nothing (it should block)", x)
case x := <-cancelChild.Done():
t.Errorf("<-cancelChild.Done() == %v want nothing (it should block)", x)
case x := <-timerChild.Done():
t.Errorf("<-timerChild.Done() == %v want nothing (it should block)", x)
case x := <-valueChild.Done():
t.Errorf("<-valueChild.Done() == %v want nothing (it should block)", x)
default:
}
// The parent's children should contain the two cancelable children.
pc := parent.(*cancelCtx)
cc := cancelChild.(*cancelCtx)
tc := timerChild.(*timerCtx)
pc.mu.Lock()
if len(pc.children) != 2 || !pc.children[cc] || !pc.children[tc] {
t.Errorf("bad linkage: pc.children = %v, want %v and %v",
pc.children, cc, tc)
}
pc.mu.Unlock()
if p, ok := parentCancelCtx(cc.Context); !ok || p != pc {
t.Errorf("bad linkage: parentCancelCtx(cancelChild.Context) = %v, %v want %v, true", p, ok, pc)
}
if p, ok := parentCancelCtx(tc.Context); !ok || p != pc {
t.Errorf("bad linkage: parentCancelCtx(timerChild.Context) = %v, %v want %v, true", p, ok, pc)
}
cancel()
pc.mu.Lock()
if len(pc.children) != 0 {
t.Errorf("pc.cancel didn't clear pc.children = %v", pc.children)
}
pc.mu.Unlock()
// parent and children should all be finished.
check := func(ctx Context, name string) {
select {
case <-ctx.Done():
default:
t.Errorf("<-%s.Done() blocked, but shouldn't have", name)
}
if e := ctx.Err(); e != Canceled {
t.Errorf("%s.Err() == %v want %v", name, e, Canceled)
}
}
check(parent, "parent")
check(cancelChild, "cancelChild")
check(valueChild, "valueChild")
check(timerChild, "timerChild")
// WithCancel should return a canceled context on a canceled parent.
precanceledChild := WithValue(parent, "key", "value")
select {
case <-precanceledChild.Done():
default:
t.Errorf("<-precanceledChild.Done() blocked, but shouldn't have")
}
if e := precanceledChild.Err(); e != Canceled {
t.Errorf("precanceledChild.Err() == %v want %v", e, Canceled)
}
}
func TestChildFinishesFirst(t *testing.T) {
cancelable, stop := WithCancel(Background())
defer stop()
for _, parent := range []Context{Background(), cancelable} {
child, cancel := WithCancel(parent)
select {
case x := <-parent.Done():
t.Errorf("<-parent.Done() == %v want nothing (it should block)", x)
case x := <-child.Done():
t.Errorf("<-child.Done() == %v want nothing (it should block)", x)
default:
}
cc := child.(*cancelCtx)
pc, pcok := parent.(*cancelCtx) // pcok == false when parent == Background()
if p, ok := parentCancelCtx(cc.Context); ok != pcok || (ok && pc != p) {
t.Errorf("bad linkage: parentCancelCtx(cc.Context) = %v, %v want %v, %v", p, ok, pc, pcok)
}
if pcok {
pc.mu.Lock()
if len(pc.children) != 1 || !pc.children[cc] {
t.Errorf("bad linkage: pc.children = %v, cc = %v", pc.children, cc)
}
pc.mu.Unlock()
}
cancel()
if pcok {
pc.mu.Lock()
if len(pc.children) != 0 {
t.Errorf("child's cancel didn't remove self from pc.children = %v", pc.children)
}
pc.mu.Unlock()
}
// child should be finished.
select {
case <-child.Done():
default:
t.Errorf("<-child.Done() blocked, but shouldn't have")
}
if e := child.Err(); e != Canceled {
t.Errorf("child.Err() == %v want %v", e, Canceled)
}
// parent should not be finished.
select {
case x := <-parent.Done():
t.Errorf("<-parent.Done() == %v want nothing (it should block)", x)
default:
}
if e := parent.Err(); e != nil {
t.Errorf("parent.Err() == %v want nil", e)
}
}
}
func testDeadline(c Context, wait time.Duration, t *testing.T) {
select {
case <-time.After(wait):
t.Fatalf("context should have timed out")
case <-c.Done():
}
if e := c.Err(); e != DeadlineExceeded {
t.Errorf("c.Err() == %v want %v", e, DeadlineExceeded)
}
}
func TestDeadline(t *testing.T) {
c, _ := WithDeadline(Background(), time.Now().Add(100*time.Millisecond))
if got, prefix := fmt.Sprint(c), "context.Background.WithDeadline("; !strings.HasPrefix(got, prefix) {
t.Errorf("c.String() = %q want prefix %q", got, prefix)
}
testDeadline(c, 200*time.Millisecond, t)
c, _ = WithDeadline(Background(), time.Now().Add(100*time.Millisecond))
o := otherContext{c}
testDeadline(o, 200*time.Millisecond, t)
c, _ = WithDeadline(Background(), time.Now().Add(100*time.Millisecond))
o = otherContext{c}
c, _ = WithDeadline(o, time.Now().Add(300*time.Millisecond))
testDeadline(c, 200*time.Millisecond, t)
}
func TestTimeout(t *testing.T) {
c, _ := WithTimeout(Background(), 100*time.Millisecond)
if got, prefix := fmt.Sprint(c), "context.Background.WithDeadline("; !strings.HasPrefix(got, prefix) {
t.Errorf("c.String() = %q want prefix %q", got, prefix)
}
testDeadline(c, 200*time.Millisecond, t)
c, _ = WithTimeout(Background(), 100*time.Millisecond)
o := otherContext{c}
testDeadline(o, 200*time.Millisecond, t)
c, _ = WithTimeout(Background(), 100*time.Millisecond)
o = otherContext{c}
c, _ = WithTimeout(o, 300*time.Millisecond)
testDeadline(c, 200*time.Millisecond, t)
}
func TestCanceledTimeout(t *testing.T) {
c, _ := WithTimeout(Background(), 200*time.Millisecond)
o := otherContext{c}
c, cancel := WithTimeout(o, 400*time.Millisecond)
cancel()
time.Sleep(100 * time.Millisecond) // let cancelation propagate
select {
case <-c.Done():
default:
t.Errorf("<-c.Done() blocked, but shouldn't have")
}
if e := c.Err(); e != Canceled {
t.Errorf("c.Err() == %v want %v", e, Canceled)
}
}
type key1 int
type key2 int
var k1 = key1(1)
var k2 = key2(1) // same int as k1, different type
var k3 = key2(3) // same type as k2, different int
func TestValues(t *testing.T) {
check := func(c Context, nm, v1, v2, v3 string) {
if v, ok := c.Value(k1).(string); ok == (len(v1) == 0) || v != v1 {
t.Errorf(`%s.Value(k1).(string) = %q, %t want %q, %t`, nm, v, ok, v1, len(v1) != 0)
}
if v, ok := c.Value(k2).(string); ok == (len(v2) == 0) || v != v2 {
t.Errorf(`%s.Value(k2).(string) = %q, %t want %q, %t`, nm, v, ok, v2, len(v2) != 0)
}
if v, ok := c.Value(k3).(string); ok == (len(v3) == 0) || v != v3 {
t.Errorf(`%s.Value(k3).(string) = %q, %t want %q, %t`, nm, v, ok, v3, len(v3) != 0)
}
}
c0 := Background()
check(c0, "c0", "", "", "")
c1 := WithValue(Background(), k1, "c1k1")
check(c1, "c1", "c1k1", "", "")
if got, want := fmt.Sprint(c1), `context.Background.WithValue(1, "c1k1")`; got != want {
t.Errorf("c.String() = %q want %q", got, want)
}
c2 := WithValue(c1, k2, "c2k2")
check(c2, "c2", "c1k1", "c2k2", "")
c3 := WithValue(c2, k3, "c3k3")
check(c3, "c2", "c1k1", "c2k2", "c3k3")
c4 := WithValue(c3, k1, nil)
check(c4, "c4", "", "c2k2", "c3k3")
o0 := otherContext{Background()}
check(o0, "o0", "", "", "")
o1 := otherContext{WithValue(Background(), k1, "c1k1")}
check(o1, "o1", "c1k1", "", "")
o2 := WithValue(o1, k2, "o2k2")
check(o2, "o2", "c1k1", "o2k2", "")
o3 := otherContext{c4}
check(o3, "o3", "", "c2k2", "c3k3")
o4 := WithValue(o3, k3, nil)
check(o4, "o4", "", "c2k2", "")
}
func TestAllocs(t *testing.T) {
bg := Background()
for _, test := range []struct {
desc string
f func()
limit float64
gccgoLimit float64
}{
{
desc: "Background()",
f: func() { Background() },
limit: 0,
gccgoLimit: 0,
},
{
desc: fmt.Sprintf("WithValue(bg, %v, nil)", k1),
f: func() {
c := WithValue(bg, k1, nil)
c.Value(k1)
},
limit: 3,
gccgoLimit: 3,
},
{
desc: "WithTimeout(bg, 15*time.Millisecond)",
f: func() {
c, _ := WithTimeout(bg, 15*time.Millisecond)
<-c.Done()
},
limit: 8,
gccgoLimit: 13,
},
{
desc: "WithCancel(bg)",
f: func() {
c, cancel := WithCancel(bg)
cancel()
<-c.Done()
},
limit: 5,
gccgoLimit: 8,
},
{
desc: "WithTimeout(bg, 100*time.Millisecond)",
f: func() {
c, cancel := WithTimeout(bg, 100*time.Millisecond)
cancel()
<-c.Done()
},
limit: 8,
gccgoLimit: 25,
},
} {
limit := test.limit
if runtime.Compiler == "gccgo" {
// gccgo does not yet do escape analysis.
// TOOD(iant): Remove this when gccgo does do escape analysis.
limit = test.gccgoLimit
}
if n := testing.AllocsPerRun(100, test.f); n > limit {
t.Errorf("%s allocs = %f want %d", test.desc, n, int(limit))
}
}
}
func TestSimultaneousCancels(t *testing.T) {
root, cancel := WithCancel(Background())
m := map[Context]CancelFunc{root: cancel}
q := []Context{root}
// Create a tree of contexts.
for len(q) != 0 && len(m) < 100 {
parent := q[0]
q = q[1:]
for i := 0; i < 4; i++ {
ctx, cancel := WithCancel(parent)
m[ctx] = cancel
q = append(q, ctx)
}
}
// Start all the cancels in a random order.
var wg sync.WaitGroup
wg.Add(len(m))
for _, cancel := range m {
go func(cancel CancelFunc) {
cancel()
wg.Done()
}(cancel)
}
// Wait on all the contexts in a random order.
for ctx := range m {
select {
case <-ctx.Done():
case <-time.After(1 * time.Second):
buf := make([]byte, 10<<10)
n := runtime.Stack(buf, true)
t.Fatalf("timed out waiting for <-ctx.Done(); stacks:\n%s", buf[:n])
}
}
// Wait for all the cancel functions to return.
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(1 * time.Second):
buf := make([]byte, 10<<10)
n := runtime.Stack(buf, true)
t.Fatalf("timed out waiting for cancel functions; stacks:\n%s", buf[:n])
}
}
func TestInterlockedCancels(t *testing.T) {
parent, cancelParent := WithCancel(Background())
child, cancelChild := WithCancel(parent)
go func() {
parent.Done()
cancelChild()
}()
cancelParent()
select {
case <-child.Done():
case <-time.After(1 * time.Second):
buf := make([]byte, 10<<10)
n := runtime.Stack(buf, true)
t.Fatalf("timed out waiting for child.Done(); stacks:\n%s", buf[:n])
}
}
func TestLayersCancel(t *testing.T) {
testLayers(t, time.Now().UnixNano(), false)
}
func TestLayersTimeout(t *testing.T) {
testLayers(t, time.Now().UnixNano(), true)
}
func testLayers(t *testing.T, seed int64, testTimeout bool) {
rand.Seed(seed)
errorf := func(format string, a ...interface{}) {
t.Errorf(fmt.Sprintf("seed=%d: %s", seed, format), a...)
}
const (
timeout = 200 * time.Millisecond
minLayers = 30
)
type value int
var (
vals []*value
cancels []CancelFunc
numTimers int
ctx = Background()
)
for i := 0; i < minLayers || numTimers == 0 || len(cancels) == 0 || len(vals) == 0; i++ {
switch rand.Intn(3) {
case 0:
v := new(value)
ctx = WithValue(ctx, v, v)
vals = append(vals, v)
case 1:
var cancel CancelFunc
ctx, cancel = WithCancel(ctx)
cancels = append(cancels, cancel)
case 2:
var cancel CancelFunc
ctx, cancel = WithTimeout(ctx, timeout)
cancels = append(cancels, cancel)
numTimers++
}
}
checkValues := func(when string) {
for _, key := range vals {
if val := ctx.Value(key).(*value); key != val {
errorf("%s: ctx.Value(%p) = %p want %p", when, key, val, key)
}
}
}
select {
case <-ctx.Done():
errorf("ctx should not be canceled yet")
default:
}
if s, prefix := fmt.Sprint(ctx), "context.Background."; !strings.HasPrefix(s, prefix) {
t.Errorf("ctx.String() = %q want prefix %q", s, prefix)
}
t.Log(ctx)
checkValues("before cancel")
if testTimeout {
select {
case <-ctx.Done():
case <-time.After(timeout + timeout/10):
errorf("ctx should have timed out")
}
checkValues("after timeout")
} else {
cancel := cancels[rand.Intn(len(cancels))]
cancel()
select {
case <-ctx.Done():
default:
errorf("ctx should be canceled")
}
checkValues("after cancel")
}
}
func TestCancelRemoves(t *testing.T) {
checkChildren := func(when string, ctx Context, want int) {
if got := len(ctx.(*cancelCtx).children); got != want {
t.Errorf("%s: context has %d children, want %d", when, got, want)
}
}
ctx, _ := WithCancel(Background())
checkChildren("after creation", ctx, 0)
_, cancel := WithCancel(ctx)
checkChildren("with WithCancel child ", ctx, 1)
cancel()
checkChildren("after cancelling WithCancel child", ctx, 0)
ctx, _ = WithCancel(Background())
checkChildren("after creation", ctx, 0)
_, cancel = WithTimeout(ctx, 60*time.Minute)
checkChildren("with WithTimeout child ", ctx, 1)
cancel()
checkChildren("after cancelling WithTimeout child", ctx, 0)
}

View file

@ -0,0 +1,26 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package context_test
import (
"fmt"
"time"
"golang.org/x/net/context"
)
func ExampleWithTimeout() {
// Pass a context with a timeout to tell a blocking function that it
// should abandon its work after the timeout elapses.
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
select {
case <-time.After(200 * time.Millisecond):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
}
// Output:
// context deadline exceeded
}

170
promql/analyzer.go Normal file
View file

@ -0,0 +1,170 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promql
import (
"errors"
"time"
"golang.org/x/net/context"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local"
)
// An Analyzer traverses an expression and determines which data has to be requested
// from the storage. It is bound to a context that allows cancellation and timing out.
type Analyzer struct {
// The storage from which to query data.
Storage local.Storage
// The expression being analyzed.
Expr Expr
// The time range for evaluation of Expr.
Start, End clientmodel.Timestamp
// The preload times for different query time offsets.
offsetPreloadTimes map[time.Duration]preloadTimes
}
// preloadTimes tracks which instants or ranges to preload for a set of
// fingerprints. One of these structs is collected for each offset by the query
// analyzer.
type preloadTimes struct {
// Instants require single samples to be loaded along the entire query
// range, with intervals between the samples corresponding to the query
// resolution.
instants map[clientmodel.Fingerprint]struct{}
// Ranges require loading a range of samples at each resolution step,
// stretching backwards from the current evaluation timestamp. The length of
// the range into the past is given by the duration, as in "foo[5m]".
ranges map[clientmodel.Fingerprint]time.Duration
}
// Analyze the provided expression and attach metrics and fingerprints to data-selecting
// AST nodes that are later used to preload the data from the storage.
func (a *Analyzer) Analyze(ctx context.Context) error {
a.offsetPreloadTimes = map[time.Duration]preloadTimes{}
getPreloadTimes := func(offset time.Duration) preloadTimes {
if _, ok := a.offsetPreloadTimes[offset]; !ok {
a.offsetPreloadTimes[offset] = preloadTimes{
instants: map[clientmodel.Fingerprint]struct{}{},
ranges: map[clientmodel.Fingerprint]time.Duration{},
}
}
return a.offsetPreloadTimes[offset]
}
// Retrieve fingerprints and metrics for the required time range for
// each metric or matrix selector node.
Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
pt := getPreloadTimes(n.Offset)
fpts := a.Storage.GetFingerprintsForLabelMatchers(n.LabelMatchers)
n.fingerprints = fpts
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
for _, fp := range fpts {
// Only add the fingerprint to the instants if not yet present in the
// ranges. Ranges always contain more points and span more time than
// instants for the same offset.
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
pt.instants[fp] = struct{}{}
}
n.metrics[fp] = a.Storage.GetMetricForFingerprint(fp)
}
case *MatrixSelector:
pt := getPreloadTimes(n.Offset)
fpts := a.Storage.GetFingerprintsForLabelMatchers(n.LabelMatchers)
n.fingerprints = fpts
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
for _, fp := range fpts {
if pt.ranges[fp] < n.Range {
pt.ranges[fp] = n.Range
// Delete the fingerprint from the instants. Ranges always contain more
// points and span more time than instants, so we don't need to track
// an instant for the same fingerprint, should we have one.
delete(pt.instants, fp)
}
n.metrics[fp] = a.Storage.GetMetricForFingerprint(fp)
}
}
return true
})
// Currently we do not return an error but we might place a context check in here
// or extend the stage in some other way.
return nil
}
// Prepare the expression evaluation by preloading all required chunks from the storage
// and setting the respective storage iterators in the AST nodes.
func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
const env = "query preparation"
if a.offsetPreloadTimes == nil {
return nil, errors.New("analysis must be performed before preparing query")
}
var err error
// The preloader must not be closed unless an error ocurred as closing
// unpins the preloaded chunks.
p := a.Storage.NewPreloader()
defer func() {
if err != nil {
p.Close()
}
}()
// Preload all analyzed ranges.
for offset, pt := range a.offsetPreloadTimes {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
start := a.Start.Add(-offset)
end := a.End.Add(-offset)
for fp, rangeDuration := range pt.ranges {
err = p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta)
if err != nil {
return nil, err
}
}
for fp := range pt.instants {
err = p.PreloadRange(fp, start, end, *stalenessDelta)
if err != nil {
return nil, err
}
}
}
// Attach storage iterators to AST nodes.
Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
for _, fp := range n.fingerprints {
n.iterators[fp] = a.Storage.NewIterator(fp)
}
case *MatrixSelector:
for _, fp := range n.fingerprints {
n.iterators[fp] = a.Storage.NewIterator(fp)
}
}
return true
})
return p, nil
}

View file

@ -180,7 +180,7 @@ type ParenExpr struct {
// StringLiteral represents a string. // StringLiteral represents a string.
type StringLiteral struct { type StringLiteral struct {
Str string Val string
} }
// UnaryExpr represents a unary operation on another expression. // UnaryExpr represents a unary operation on another expression.

1219
promql/engine.go Normal file

File diff suppressed because it is too large Load diff

272
promql/engine_test.go Normal file
View file

@ -0,0 +1,272 @@
package promql
import (
"reflect"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local"
)
func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond
defer func() {
// Restore default query timeout
*defaultQueryTimeout = 2 * time.Minute
}()
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop()
query, err := engine.NewQuery("foo = bar")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
// Timeouts are not exact but checked in designated places. For example between
// invoking handlers. Thus, we reigster two handlers that take some time to ensure we check
// after exceeding the timeout.
// Should the implementation of this area change, the test might have to be adjusted.
engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error {
time.Sleep(10 * time.Millisecond)
return nil
})
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
time.Sleep(10 * time.Millisecond)
return nil
})
res := query.Exec()
if res.Err == nil {
t.Fatalf("expected timeout error but got none")
}
if _, ok := res.Err.(ErrQueryTimeout); res.Err != nil && !ok {
t.Fatalf("expected timeout error but got: %s", res.Err)
}
}
func TestQueryCancel(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop()
query1, err := engine.NewQuery("foo = bar")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
query2, err := engine.NewQuery("foo = baz")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
// As for timeouts, cancellation is only checked at designated points. We ensure
// that we reach one of those points using the same method.
engine.RegisterRecordHandler("test1", func(context.Context, *RecordStmt) error {
<-time.After(2 * time.Millisecond)
return nil
})
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
<-time.After(2 * time.Millisecond)
return nil
})
// Cancel query after starting it.
var wg sync.WaitGroup
var res *Result
wg.Add(1)
go func() {
res = query1.Exec()
wg.Done()
}()
<-time.After(1 * time.Millisecond)
query1.Cancel()
wg.Wait()
if res.Err == nil {
t.Fatalf("expected cancellation error for query1 but got none")
}
if _, ok := res.Err.(ErrQueryCanceled); res.Err != nil && !ok {
t.Fatalf("expected cancellation error for query1 but got: %s", res.Err)
}
// Canceling query before starting it must have no effect.
query2.Cancel()
res = query2.Exec()
if res.Err != nil {
t.Fatalf("unexpeceted error on executing query2: %s", res.Err)
}
}
func TestEngineShutdown(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
query1, err := engine.NewQuery("foo = bar")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
query2, err := engine.NewQuery("foo = baz")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become
// concurrent this test has to be adjusted accordingly.
engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error {
handlerExecutions++
engine.Stop()
time.Sleep(10 * time.Millisecond)
return nil
})
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
handlerExecutions++
engine.Stop()
time.Sleep(10 * time.Millisecond)
return nil
})
// Stopping the engine should cancel the base context. While setting up queries is
// still possible their context is canceled from the beginning and execution should
// terminate immediately.
res := query1.Exec()
if res.Err == nil {
t.Fatalf("expected error on shutdown during query but got none")
}
if handlerExecutions != 1 {
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executons", handlerExecutions)
}
res2 := query2.Exec()
if res2.Err == nil {
t.Fatalf("expected error on querying shutdown engine but got none")
}
if handlerExecutions != 1 {
t.Fatalf("expected no handler execution for query after engine shutdown")
}
}
func TestAlertHandler(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop()
qs := `ALERT Foo IF bar FOR 5m WITH {a="b"} SUMMARY "sum" DESCRIPTION "desc"`
doQuery := func(expectFailure bool) *AlertStmt {
query, err := engine.NewQuery(qs)
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
res := query.Exec()
if expectFailure && res.Err == nil {
t.Fatalf("expected error but got none.")
}
if res.Err != nil && !expectFailure {
t.Fatalf("error on executing alert query: %s", res.Err)
}
// That this alert statement is correct is tested elsewhere.
return query.Statements()[0].(*AlertStmt)
}
// We expect an error if nothing is registered to handle the query.
alertStmt := doQuery(true)
receivedCalls := 0
// Ensure that we receive the correct statement.
engine.RegisterAlertHandler("test", func(ctx context.Context, as *AlertStmt) error {
if !reflect.DeepEqual(alertStmt, as) {
t.Errorf("received alert statement did not match input: %q", qs)
t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(alertStmt), Tree(as))
}
receivedCalls++
return nil
})
for i := 0; i < 10; i++ {
doQuery(false)
if receivedCalls != i+1 {
t.Fatalf("alert handler was not called on query execution")
}
}
engine.UnregisterAlertHandler("test")
// We must receive no further calls after unregistering.
doQuery(true)
if receivedCalls != 10 {
t.Fatalf("received calls after unregistering alert handler")
}
}
func TestRecordHandler(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop()
qs := `foo = bar`
doQuery := func(expectFailure bool) *RecordStmt {
query, err := engine.NewQuery(qs)
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
res := query.Exec()
if expectFailure && res.Err == nil {
t.Fatalf("expected error but got none.")
}
if res.Err != nil && !expectFailure {
t.Fatalf("error on executing record query: %s", res.Err)
}
return query.Statements()[0].(*RecordStmt)
}
// We expect an error if nothing is registered to handle the query.
recordStmt := doQuery(true)
receivedCalls := 0
// Ensure that we receive the correct statement.
engine.RegisterRecordHandler("test", func(ctx context.Context, rs *RecordStmt) error {
if !reflect.DeepEqual(recordStmt, rs) {
t.Errorf("received record statement did not match input: %q", qs)
t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(recordStmt), Tree(rs))
}
receivedCalls++
return nil
})
for i := 0; i < 10; i++ {
doQuery(false)
if receivedCalls != i+1 {
t.Fatalf("record handler was not called on query execution")
}
}
engine.UnregisterRecordHandler("test")
// We must receive no further calls after unregistering.
doQuery(true)
if receivedCalls != 10 {
t.Fatalf("received calls after unregistering record handler")
}
}

View file

@ -13,6 +13,18 @@
package promql package promql
import (
"container/heap"
"math"
"sort"
"strconv"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
// Function represents a function of the expression language and is // Function represents a function of the expression language and is
// used by function nodes. // used by function nodes.
type Function struct { type Function struct {
@ -20,7 +32,477 @@ type Function struct {
ArgTypes []ExprType ArgTypes []ExprType
OptionalArgs int OptionalArgs int
ReturnType ExprType ReturnType ExprType
Call func() Call func(ev *evaluator, args Expressions) Value
}
// === time() clientmodel.SampleValue ===
func funcTime(ev *evaluator, args Expressions) Value {
return &Scalar{
Value: clientmodel.SampleValue(ev.Timestamp.Unix()),
Timestamp: ev.Timestamp,
}
}
// === delta(matrix ExprMatrix, isCounter=0 ExprScalar) Vector ===
func funcDelta(ev *evaluator, args Expressions) Value {
isCounter := len(args) >= 2 && ev.evalInt(args[1]) > 0
resultVector := Vector{}
// If we treat these metrics as counters, we need to fetch all values
// in the interval to find breaks in the timeseries' monotonicity.
// I.e. if a counter resets, we want to ignore that reset.
var matrixValue Matrix
if isCounter {
matrixValue = ev.evalMatrix(args[0])
} else {
matrixValue = ev.evalMatrixBounds(args[0])
}
for _, samples := range matrixValue {
// No sense in trying to compute a delta without at least two points. Drop
// this vector element.
if len(samples.Values) < 2 {
continue
}
counterCorrection := clientmodel.SampleValue(0)
lastValue := clientmodel.SampleValue(0)
for _, sample := range samples.Values {
currentValue := sample.Value
if isCounter && currentValue < lastValue {
counterCorrection += lastValue - currentValue
}
lastValue = currentValue
}
resultValue := lastValue - samples.Values[0].Value + counterCorrection
targetInterval := args[0].(*MatrixSelector).Range
sampledInterval := samples.Values[len(samples.Values)-1].Timestamp.Sub(samples.Values[0].Timestamp)
if sampledInterval == 0 {
// Only found one sample. Cannot compute a rate from this.
continue
}
// Correct for differences in target vs. actual delta interval.
//
// Above, we didn't actually calculate the delta for the specified target
// interval, but for an interval between the first and last found samples
// under the target interval, which will usually have less time between
// them. Depending on how many samples are found under a target interval,
// the delta results are distorted and temporal aliasing occurs (ugly
// bumps). This effect is corrected for below.
intervalCorrection := clientmodel.SampleValue(targetInterval) / clientmodel.SampleValue(sampledInterval)
resultValue *= intervalCorrection
resultSample := &Sample{
Metric: samples.Metric,
Value: resultValue,
Timestamp: ev.Timestamp,
}
resultSample.Metric.Delete(clientmodel.MetricNameLabel)
resultVector = append(resultVector, resultSample)
}
return resultVector
}
// === rate(node ExprMatrix) Vector ===
func funcRate(ev *evaluator, args Expressions) Value {
args = append(args, &NumberLiteral{1})
vector := funcDelta(ev, args).(Vector)
// TODO: could be other type of ExprMatrix in the future (right now, only
// MatrixSelector exists). Find a better way of getting the duration of a
// matrix, such as looking at the samples themselves.
interval := args[0].(*MatrixSelector).Range
for i := range vector {
vector[i].Value /= clientmodel.SampleValue(interval / time.Second)
}
return vector
}
// === sort(node ExprVector) Vector ===
func funcSort(ev *evaluator, args Expressions) Value {
byValueSorter := vectorByValueHeap(ev.evalVector(args[0]))
sort.Sort(byValueSorter)
return Vector(byValueSorter)
}
// === sortDesc(node ExprVector) Vector ===
func funcSortDesc(ev *evaluator, args Expressions) Value {
byValueSorter := vectorByValueHeap(ev.evalVector(args[0]))
sort.Sort(sort.Reverse(byValueSorter))
return Vector(byValueSorter)
}
// === topk(k ExprScalar, node ExprVector) Vector ===
func funcTopk(ev *evaluator, args Expressions) Value {
k := ev.evalInt(args[0])
if k < 1 {
return Vector{}
}
vector := ev.evalVector(args[1])
topk := make(vectorByValueHeap, 0, k)
for _, el := range vector {
if len(topk) < k || topk[0].Value < el.Value {
if len(topk) == k {
heap.Pop(&topk)
}
heap.Push(&topk, el)
}
}
sort.Sort(sort.Reverse(topk))
return Vector(topk)
}
// === bottomk(k ExprScalar, node ExprVector) Vector ===
func funcBottomk(ev *evaluator, args Expressions) Value {
k := ev.evalInt(args[0])
if k < 1 {
return Vector{}
}
vector := ev.evalVector(args[1])
bottomk := make(vectorByValueHeap, 0, k)
bkHeap := reverseHeap{Interface: &bottomk}
for _, el := range vector {
if len(bottomk) < k || bottomk[0].Value > el.Value {
if len(bottomk) == k {
heap.Pop(&bkHeap)
}
heap.Push(&bkHeap, el)
}
}
sort.Sort(bottomk)
return Vector(bottomk)
}
// === drop_common_labels(node ExprVector) Vector ===
func funcDropCommonLabels(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
if len(vector) < 1 {
return Vector{}
}
common := clientmodel.LabelSet{}
for k, v := range vector[0].Metric.Metric {
// TODO(julius): Should we also drop common metric names?
if k == clientmodel.MetricNameLabel {
continue
}
common[k] = v
}
for _, el := range vector[1:] {
for k, v := range common {
if el.Metric.Metric[k] != v {
// Deletion of map entries while iterating over them is safe.
// From http://golang.org/ref/spec#For_statements:
// "If map entries that have not yet been reached are deleted during
// iteration, the corresponding iteration values will not be produced."
delete(common, k)
}
}
}
for _, el := range vector {
for k := range el.Metric.Metric {
if _, ok := common[k]; ok {
el.Metric.Delete(k)
}
}
}
return vector
}
// === round(vector ExprVector, toNearest=1 Scalar) Vector ===
func funcRound(ev *evaluator, args Expressions) Value {
// round returns a number rounded to toNearest.
// Ties are solved by rounding up.
toNearest := float64(1)
if len(args) >= 2 {
toNearest = ev.evalFloat(args[1])
}
// Invert as it seems to cause fewer floating point accuracy issues.
toNearestInverse := 1.0 / toNearest
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Floor(float64(el.Value)*toNearestInverse+0.5) / toNearestInverse)
}
return vector
}
// === scalar(node ExprVector) Scalar ===
func funcScalar(ev *evaluator, args Expressions) Value {
v := ev.evalVector(args[0])
if len(v) != 1 {
return &Scalar{clientmodel.SampleValue(math.NaN()), ev.Timestamp}
}
return &Scalar{clientmodel.SampleValue(v[0].Value), ev.Timestamp}
}
// === count_scalar(vector ExprVector) model.SampleValue ===
func funcCountScalar(ev *evaluator, args Expressions) Value {
return &Scalar{
Value: clientmodel.SampleValue(len(ev.evalVector(args[0]))),
Timestamp: ev.Timestamp,
}
}
func aggrOverTime(ev *evaluator, args Expressions, aggrFn func(metric.Values) clientmodel.SampleValue) Value {
matrix := ev.evalMatrix(args[0])
resultVector := Vector{}
for _, el := range matrix {
if len(el.Values) == 0 {
continue
}
el.Metric.Delete(clientmodel.MetricNameLabel)
resultVector = append(resultVector, &Sample{
Metric: el.Metric,
Value: aggrFn(el.Values),
Timestamp: ev.Timestamp,
})
}
return resultVector
}
// === avg_over_time(matrix ExprMatrix) Vector ===
func funcAvgOverTime(ev *evaluator, args Expressions) Value {
return aggrOverTime(ev, args, func(values metric.Values) clientmodel.SampleValue {
var sum clientmodel.SampleValue
for _, v := range values {
sum += v.Value
}
return sum / clientmodel.SampleValue(len(values))
})
}
// === count_over_time(matrix ExprMatrix) Vector ===
func funcCountOverTime(ev *evaluator, args Expressions) Value {
return aggrOverTime(ev, args, func(values metric.Values) clientmodel.SampleValue {
return clientmodel.SampleValue(len(values))
})
}
// === floor(vector ExprVector) Vector ===
func funcFloor(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Floor(float64(el.Value)))
}
return vector
}
// === max_over_time(matrix ExprMatrix) Vector ===
func funcMaxOverTime(ev *evaluator, args Expressions) Value {
return aggrOverTime(ev, args, func(values metric.Values) clientmodel.SampleValue {
max := math.Inf(-1)
for _, v := range values {
max = math.Max(max, float64(v.Value))
}
return clientmodel.SampleValue(max)
})
}
// === min_over_time(matrix ExprMatrix) Vector ===
func funcMinOverTime(ev *evaluator, args Expressions) Value {
return aggrOverTime(ev, args, func(values metric.Values) clientmodel.SampleValue {
min := math.Inf(1)
for _, v := range values {
min = math.Min(min, float64(v.Value))
}
return clientmodel.SampleValue(min)
})
}
// === sum_over_time(matrix ExprMatrix) Vector ===
func funcSumOverTime(ev *evaluator, args Expressions) Value {
return aggrOverTime(ev, args, func(values metric.Values) clientmodel.SampleValue {
var sum clientmodel.SampleValue
for _, v := range values {
sum += v.Value
}
return sum
})
}
// === abs(vector ExprVector) Vector ===
func funcAbs(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Abs(float64(el.Value)))
}
return vector
}
// === absent(vector ExprVector) Vector ===
func funcAbsent(ev *evaluator, args Expressions) Value {
if len(ev.evalVector(args[0])) > 0 {
return Vector{}
}
m := clientmodel.Metric{}
if vs, ok := args[0].(*VectorSelector); ok {
for _, matcher := range vs.LabelMatchers {
if matcher.Type == metric.Equal && matcher.Name != clientmodel.MetricNameLabel {
m[matcher.Name] = matcher.Value
}
}
}
return Vector{
&Sample{
Metric: clientmodel.COWMetric{
Metric: m,
Copied: true,
},
Value: 1,
Timestamp: ev.Timestamp,
},
}
}
// === ceil(vector ExprVector) Vector ===
func funcCeil(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Ceil(float64(el.Value)))
}
return vector
}
// === exp(vector ExprVector) Vector ===
func funcExp(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Exp(float64(el.Value)))
}
return vector
}
// === sqrt(vector VectorNode) Vector ===
func funcSqrt(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Sqrt(float64(el.Value)))
}
return vector
}
// === ln(vector ExprVector) Vector ===
func funcLn(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Log(float64(el.Value)))
}
return vector
}
// === log2(vector ExprVector) Vector ===
func funcLog2(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Log2(float64(el.Value)))
}
return vector
}
// === log10(vector ExprVector) Vector ===
func funcLog10(ev *evaluator, args Expressions) Value {
vector := ev.evalVector(args[0])
for _, el := range vector {
el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Log10(float64(el.Value)))
}
return vector
}
// === deriv(node ExprMatrix) Vector ===
func funcDeriv(ev *evaluator, args Expressions) Value {
resultVector := Vector{}
matrix := ev.evalMatrix(args[0])
for _, samples := range matrix {
// No sense in trying to compute a derivative without at least two points.
// Drop this vector element.
if len(samples.Values) < 2 {
continue
}
// Least squares.
n := clientmodel.SampleValue(0)
sumY := clientmodel.SampleValue(0)
sumX := clientmodel.SampleValue(0)
sumXY := clientmodel.SampleValue(0)
sumX2 := clientmodel.SampleValue(0)
for _, sample := range samples.Values {
x := clientmodel.SampleValue(sample.Timestamp.UnixNano() / 1e9)
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
numerator := sumXY - sumX*sumY/n
denominator := sumX2 - (sumX*sumX)/n
resultValue := numerator / denominator
resultSample := &Sample{
Metric: samples.Metric,
Value: resultValue,
Timestamp: ev.Timestamp,
}
resultSample.Metric.Delete(clientmodel.MetricNameLabel)
resultVector = append(resultVector, resultSample)
}
return resultVector
}
// === histogram_quantile(k ExprScalar, vector ExprVector) Vector ===
func funcHistogramQuantile(ev *evaluator, args Expressions) Value {
q := clientmodel.SampleValue(ev.evalFloat(args[0]))
inVec := ev.evalVector(args[1])
outVec := Vector{}
signatureToMetricWithBuckets := map[uint64]*metricWithBuckets{}
for _, el := range inVec {
upperBound, err := strconv.ParseFloat(
string(el.Metric.Metric[clientmodel.BucketLabel]), 64,
)
if err != nil {
// Oops, no bucket label or malformed label value. Skip.
// TODO(beorn7): Issue a warning somehow.
continue
}
signature := clientmodel.SignatureWithoutLabels(el.Metric.Metric, excludedLabels)
mb, ok := signatureToMetricWithBuckets[signature]
if !ok {
el.Metric.Delete(clientmodel.BucketLabel)
el.Metric.Delete(clientmodel.MetricNameLabel)
mb = &metricWithBuckets{el.Metric, nil}
signatureToMetricWithBuckets[signature] = mb
}
mb.buckets = append(mb.buckets, bucket{upperBound, el.Value})
}
for _, mb := range signatureToMetricWithBuckets {
outVec = append(outVec, &Sample{
Metric: mb.metric,
Value: clientmodel.SampleValue(quantile(q, mb.buckets)),
Timestamp: ev.Timestamp,
})
}
return outVec
} }
var functions = map[string]*Function{ var functions = map[string]*Function{
@ -28,164 +510,207 @@ var functions = map[string]*Function{
Name: "abs", Name: "abs",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcAbs,
}, },
"absent": { "absent": {
Name: "absent", Name: "absent",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcAbsent,
}, },
"avg_over_time": { "avg_over_time": {
Name: "avg_over_time", Name: "avg_over_time",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcAvgOverTime,
}, },
"bottomk": { "bottomk": {
Name: "bottomk", Name: "bottomk",
ArgTypes: []ExprType{ExprScalar, ExprVector}, ArgTypes: []ExprType{ExprScalar, ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcBottomk,
}, },
"ceil": { "ceil": {
Name: "ceil", Name: "ceil",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcCeil,
}, },
"count_over_time": { "count_over_time": {
Name: "count_over_time", Name: "count_over_time",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcCountOverTime,
}, },
"count_scalar": { "count_scalar": {
Name: "count_scalar", Name: "count_scalar",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprScalar, ReturnType: ExprScalar,
Call: func() {}, Call: funcCountScalar,
}, },
"delta": { "delta": {
Name: "delta", Name: "delta",
ArgTypes: []ExprType{ExprMatrix, ExprScalar}, ArgTypes: []ExprType{ExprMatrix, ExprScalar},
OptionalArgs: 1, // The 2nd argument is deprecated. OptionalArgs: 1, // The 2nd argument is deprecated.
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcDelta,
}, },
"deriv": { "deriv": {
Name: "deriv", Name: "deriv",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcDeriv,
}, },
"drop_common_labels": { "drop_common_labels": {
Name: "drop_common_labels", Name: "drop_common_labels",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcDropCommonLabels,
}, },
"exp": { "exp": {
Name: "exp", Name: "exp",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcExp,
}, },
"floor": { "floor": {
Name: "floor", Name: "floor",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcFloor,
}, },
"histogram_quantile": { "histogram_quantile": {
Name: "histogram_quantile", Name: "histogram_quantile",
ArgTypes: []ExprType{ExprScalar, ExprVector}, ArgTypes: []ExprType{ExprScalar, ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcHistogramQuantile,
}, },
"ln": { "ln": {
Name: "ln", Name: "ln",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcLn,
}, },
"log10": { "log10": {
Name: "log10", Name: "log10",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcLog10,
}, },
"log2": { "log2": {
Name: "log2", Name: "log2",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcLog2,
}, },
"max_over_time": { "max_over_time": {
Name: "max_over_time", Name: "max_over_time",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcMaxOverTime,
}, },
"min_over_time": { "min_over_time": {
Name: "min_over_time", Name: "min_over_time",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcMinOverTime,
}, },
"rate": { "rate": {
Name: "rate", Name: "rate",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcRate,
}, },
"round": { "round": {
Name: "round", Name: "round",
ArgTypes: []ExprType{ExprVector, ExprScalar}, ArgTypes: []ExprType{ExprVector, ExprScalar},
OptionalArgs: 1, OptionalArgs: 1,
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcRound,
}, },
"scalar": { "scalar": {
Name: "scalar", Name: "scalar",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprScalar, ReturnType: ExprScalar,
Call: func() {}, Call: funcScalar,
}, },
"sort": { "sort": {
Name: "sort", Name: "sort",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcSort,
}, },
"sort_desc": { "sort_desc": {
Name: "sort_desc", Name: "sort_desc",
ArgTypes: []ExprType{ExprVector}, ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcSortDesc,
},
"sqrt": {
Name: "sqrt",
ArgTypes: []ExprType{ExprVector},
ReturnType: ExprVector,
Call: funcSqrt,
}, },
"sum_over_time": { "sum_over_time": {
Name: "sum_over_time", Name: "sum_over_time",
ArgTypes: []ExprType{ExprMatrix}, ArgTypes: []ExprType{ExprMatrix},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcSumOverTime,
}, },
"time": { "time": {
Name: "time", Name: "time",
ArgTypes: []ExprType{}, ArgTypes: []ExprType{},
ReturnType: ExprScalar, ReturnType: ExprScalar,
Call: func() {}, Call: funcTime,
}, },
"topk": { "topk": {
Name: "topk", Name: "topk",
ArgTypes: []ExprType{ExprScalar, ExprVector}, ArgTypes: []ExprType{ExprScalar, ExprVector},
ReturnType: ExprVector, ReturnType: ExprVector,
Call: func() {}, Call: funcTopk,
}, },
} }
// GetFunction returns a predefined Function object for the given name. // getFunction returns a predefined Function object for the given name.
func GetFunction(name string) (*Function, bool) { func getFunction(name string) (*Function, bool) {
function, ok := functions[name] function, ok := functions[name]
return function, ok return function, ok
} }
type vectorByValueHeap Vector
func (s vectorByValueHeap) Len() int {
return len(s)
}
func (s vectorByValueHeap) Less(i, j int) bool {
if math.IsNaN(float64(s[i].Value)) {
return true
}
return s[i].Value < s[j].Value
}
func (s vectorByValueHeap) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s *vectorByValueHeap) Push(x interface{}) {
*s = append(*s, x.(*Sample))
}
func (s *vectorByValueHeap) Pop() interface{} {
old := *s
n := len(old)
el := old[n-1]
*s = old[0 : n-1]
return el
}
type reverseHeap struct {
heap.Interface
}
func (s reverseHeap) Less(i, j int) bool {
return s.Interface.Less(j, i)
}

View file

@ -565,7 +565,7 @@ func (p *parser) aggrExpr() *AggregateExpr {
func (p *parser) call(name string) *Call { func (p *parser) call(name string) *Call {
const ctx = "function call" const ctx = "function call"
fn, exist := GetFunction(name) fn, exist := getFunction(name)
if !exist { if !exist {
p.errorf("unknown function with name %q", name) p.errorf("unknown function with name %q", name)
} }

View file

@ -1069,7 +1069,7 @@ func mustLabelMatcher(mt metric.MatchType, name clientmodel.LabelName, val clien
} }
func mustGetFunction(name string) *Function { func mustGetFunction(name string) *Function {
f, ok := GetFunction(name) f, ok := getFunction(name)
if !ok { if !ok {
panic(fmt.Errorf("function %q does not exist", name)) panic(fmt.Errorf("function %q does not exist", name))
} }

View file

@ -25,6 +25,47 @@ import (
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
func (matrix Matrix) String() string {
metricStrings := make([]string, 0, len(matrix))
for _, sampleStream := range matrix {
metricName, hasName := sampleStream.Metric.Metric[clientmodel.MetricNameLabel]
numLabels := len(sampleStream.Metric.Metric)
if hasName {
numLabels--
}
labelStrings := make([]string, 0, numLabels)
for label, value := range sampleStream.Metric.Metric {
if label != clientmodel.MetricNameLabel {
labelStrings = append(labelStrings, fmt.Sprintf("%s=%q", label, value))
}
}
sort.Strings(labelStrings)
valueStrings := make([]string, 0, len(sampleStream.Values))
for _, value := range sampleStream.Values {
valueStrings = append(valueStrings,
fmt.Sprintf("\n%v @[%v]", value.Value, value.Timestamp))
}
metricStrings = append(metricStrings,
fmt.Sprintf("%s{%s} => %s",
metricName,
strings.Join(labelStrings, ", "),
strings.Join(valueStrings, ", ")))
}
sort.Strings(metricStrings)
return strings.Join(metricStrings, "\n")
}
func (vector Vector) String() string {
metricStrings := make([]string, 0, len(vector))
for _, sample := range vector {
metricStrings = append(metricStrings,
fmt.Sprintf("%s => %v @[%v]",
sample.Metric,
sample.Value, sample.Timestamp))
}
return strings.Join(metricStrings, "\n")
}
// Tree returns a string of the tree structure of the given node. // Tree returns a string of the tree structure of the given node.
func Tree(node Node) string { func Tree(node Node) string {
return tree(node, "") return tree(node, "")
@ -175,7 +216,7 @@ func (node *ParenExpr) String() string {
} }
func (node *StringLiteral) String() string { func (node *StringLiteral) String() string {
return fmt.Sprintf("%q", node.Str) return fmt.Sprintf("%q", node.Val)
} }
func (node *UnaryExpr) String() string { func (node *UnaryExpr) String() string {
@ -321,7 +362,7 @@ func (node *MatrixSelector) DotGraph() string {
// DotGraph returns a DOT representation of the string literal. // DotGraph returns a DOT representation of the string literal.
func (node *StringLiteral) DotGraph() string { func (node *StringLiteral) DotGraph() string {
return fmt.Sprintf("%#p[label=\"'%q'\"];\n", node, node.Str) return fmt.Sprintf("%#p[label=\"'%q'\"];\n", node, node.Val)
} }
// DotGraph returns a DOT representation of the unary expression. // DotGraph returns a DOT representation of the unary expression.

1656
promql/promql_test.go Normal file

File diff suppressed because it is too large Load diff

106
promql/quantile.go Normal file
View file

@ -0,0 +1,106 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promql
import (
"math"
"sort"
clientmodel "github.com/prometheus/client_golang/model"
)
// Helpers to calculate quantiles.
// excludedLabels are the labels to exclude from signature calculation for
// quantiles.
var excludedLabels = map[clientmodel.LabelName]struct{}{
clientmodel.MetricNameLabel: struct{}{},
clientmodel.BucketLabel: struct{}{},
}
type bucket struct {
upperBound float64
count clientmodel.SampleValue
}
// buckets implements sort.Interface.
type buckets []bucket
func (b buckets) Len() int { return len(b) }
func (b buckets) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b buckets) Less(i, j int) bool { return b[i].upperBound < b[j].upperBound }
type metricWithBuckets struct {
metric clientmodel.COWMetric
buckets buckets
}
// quantile calculates the quantile 'q' based on the given buckets. The buckets
// will be sorted by upperBound by this function (i.e. no sorting needed before
// calling this function). The quantile value is interpolated assuming a linear
// distribution within a bucket. However, if the quantile falls into the highest
// bucket, the upper bound of the 2nd highest bucket is returned. A natural
// lower bound of 0 is assumed if the upper bound of the lowest bucket is
// greater 0. In that case, interpolation in the lowest bucket happens linearly
// between 0 and the upper bound of the lowest bucket. However, if the lowest
// bucket has an upper bound less or equal 0, this upper bound is returned if
// the quantile falls into the lowest bucket.
//
// There are a number of special cases (once we have a way to report errors
// happening during evaluations of AST functions, we should report those
// explicitly):
//
// If 'buckets' has fewer than 2 elements, NaN is returned.
//
// If the highest bucket is not +Inf, NaN is returned.
//
// If q<0, -Inf is returned.
//
// If q>1, +Inf is returned.
func quantile(q clientmodel.SampleValue, buckets buckets) float64 {
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
if len(buckets) < 2 {
return math.NaN()
}
sort.Sort(buckets)
if !math.IsInf(buckets[len(buckets)-1].upperBound, +1) {
return math.NaN()
}
rank := q * buckets[len(buckets)-1].count
b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank })
if b == len(buckets)-1 {
return buckets[len(buckets)-2].upperBound
}
if b == 0 && buckets[0].upperBound <= 0 {
return buckets[0].upperBound
}
var (
bucketStart float64
bucketEnd = buckets[b].upperBound
count = buckets[b].count
)
if b > 0 {
bucketStart = buckets[b-1].upperBound
count -= buckets[b-1].count
rank -= buckets[b-1].count
}
return bucketStart + (bucketEnd-bucketStart)*float64(rank/count)
}

486
promql/setup_test.go Normal file
View file

@ -0,0 +1,486 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promql
import (
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
)
var testSampleInterval = time.Duration(5) * time.Minute
var testStartTime = clientmodel.Timestamp(0)
func getTestValueStream(startVal, endVal, stepVal clientmodel.SampleValue, startTime clientmodel.Timestamp) (resultValues metric.Values) {
currentTime := startTime
for currentVal := startVal; currentVal <= endVal; currentVal += stepVal {
sample := metric.SamplePair{
Value: currentVal,
Timestamp: currentTime,
}
resultValues = append(resultValues, sample)
currentTime = currentTime.Add(testSampleInterval)
}
return resultValues
}
func getTestVectorFromTestMatrix(matrix Matrix) Vector {
vector := Vector{}
for _, sampleStream := range matrix {
lastSample := sampleStream.Values[len(sampleStream.Values)-1]
vector = append(vector, &Sample{
Metric: sampleStream.Metric,
Value: lastSample.Value,
Timestamp: lastSample.Timestamp,
})
}
return vector
}
func storeMatrix(storage local.Storage, matrix Matrix) {
pendingSamples := clientmodel.Samples{}
for _, sampleStream := range matrix {
for _, sample := range sampleStream.Values {
pendingSamples = append(pendingSamples, &clientmodel.Sample{
Metric: sampleStream.Metric.Metric,
Value: sample.Value,
Timestamp: sample.Timestamp,
})
}
}
for _, s := range pendingSamples {
storage.Append(s)
}
storage.WaitForIndexing()
}
var testVector = getTestVectorFromTestMatrix(testMatrix)
var testMatrix = Matrix{
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "0",
"group": "production",
},
},
Values: getTestValueStream(0, 100, 10, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "1",
"group": "production",
},
},
Values: getTestValueStream(0, 200, 20, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "0",
"group": "canary",
},
},
Values: getTestValueStream(0, 300, 30, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "1",
"group": "canary",
},
},
Values: getTestValueStream(0, 400, 40, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "app-server",
"instance": "0",
"group": "production",
},
},
Values: getTestValueStream(0, 500, 50, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "app-server",
"instance": "1",
"group": "production",
},
},
Values: getTestValueStream(0, 600, 60, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "app-server",
"instance": "0",
"group": "canary",
},
},
Values: getTestValueStream(0, 700, 70, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "app-server",
"instance": "1",
"group": "canary",
},
},
Values: getTestValueStream(0, 800, 80, testStartTime),
},
// Single-letter metric and label names.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "x",
"y": "testvalue",
},
},
Values: getTestValueStream(0, 100, 10, testStartTime),
},
// Counter reset in the middle of range.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testcounter_reset_middle",
},
},
Values: append(getTestValueStream(0, 40, 10, testStartTime), getTestValueStream(0, 50, 10, testStartTime.Add(testSampleInterval*5))...),
},
// Counter reset at the end of range.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testcounter_reset_end",
},
},
Values: append(getTestValueStream(0, 90, 10, testStartTime), getTestValueStream(0, 0, 10, testStartTime.Add(testSampleInterval*10))...),
},
// For label-key grouping regression test.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "label_grouping_test",
"a": "aa",
"b": "bb",
},
},
Values: getTestValueStream(0, 100, 10, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "label_grouping_test",
"a": "a",
"b": "abb",
},
},
Values: getTestValueStream(0, 200, 20, testStartTime),
},
// Two histograms with 4 buckets each (*_sum and *_count not included,
// only buckets). Lowest bucket for one histogram < 0, for the other >
// 0. They have the same name, just separated by label. Not useful in
// practice, but can happen (if clients change bucketing), and the
// server has to cope with it.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "0.1",
"start": "positive",
},
},
Values: getTestValueStream(0, 50, 5, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": ".2",
"start": "positive",
},
},
Values: getTestValueStream(0, 70, 7, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "1e0",
"start": "positive",
},
},
Values: getTestValueStream(0, 110, 11, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "+Inf",
"start": "positive",
},
},
Values: getTestValueStream(0, 120, 12, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "-.2",
"start": "negative",
},
},
Values: getTestValueStream(0, 10, 1, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "-0.1",
"start": "negative",
},
},
Values: getTestValueStream(0, 20, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "0.3",
"start": "negative",
},
},
Values: getTestValueStream(0, 20, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testhistogram_bucket",
"le": "+Inf",
"start": "negative",
},
},
Values: getTestValueStream(0, 30, 3, testStartTime),
},
// Now a more realistic histogram per job and instance to test aggregation.
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins1",
"le": "0.1",
},
},
Values: getTestValueStream(0, 10, 1, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins1",
"le": "0.2",
},
},
Values: getTestValueStream(0, 30, 3, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins1",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 40, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins2",
"le": "0.1",
},
},
Values: getTestValueStream(0, 20, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins2",
"le": "0.2",
},
},
Values: getTestValueStream(0, 50, 5, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job1",
"instance": "ins2",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 60, 6, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins1",
"le": "0.1",
},
},
Values: getTestValueStream(0, 30, 3, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins1",
"le": "0.2",
},
},
Values: getTestValueStream(0, 40, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins1",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 60, 6, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins2",
"le": "0.1",
},
},
Values: getTestValueStream(0, 40, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins2",
"le": "0.2",
},
},
Values: getTestValueStream(0, 70, 7, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "request_duration_seconds_bucket",
clientmodel.JobLabel: "job2",
"instance": "ins2",
"le": "+Inf",
},
},
Values: getTestValueStream(0, 90, 9, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "vector_matching_a",
"l": "x",
},
},
Values: getTestValueStream(0, 100, 1, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "vector_matching_a",
"l": "y",
},
},
Values: getTestValueStream(0, 100, 2, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "vector_matching_b",
"l": "x",
},
},
Values: getTestValueStream(0, 100, 4, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "cpu_count",
"instance": "0",
"type": "numa",
},
},
Values: getTestValueStream(0, 500, 30, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "cpu_count",
"instance": "0",
"type": "smp",
},
},
Values: getTestValueStream(0, 200, 10, testStartTime),
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "cpu_count",
"instance": "1",
"type": "smp",
},
},
Values: getTestValueStream(0, 200, 20, testStartTime),
},
}