Use AST query analyzer and views with tiered storage.

This commit is contained in:
Julius Volz 2013-03-21 18:06:15 +01:00
parent 3c9d6cb66c
commit 8e4c5b0cea
12 changed files with 431 additions and 164 deletions

View file

@ -25,7 +25,7 @@ import (
// require it. // require it.
type ApplicationState struct { type ApplicationState struct {
Config *config.Config Config *config.Config
Persistence metric.MetricPersistence
RuleManager rules.RuleManager RuleManager rules.RuleManager
Storage metric.Storage
TargetManager retrieval.TargetManager TargetManager retrieval.TargetManager
} }

51
main.go
View file

@ -18,7 +18,6 @@ import (
"fmt" "fmt"
"github.com/prometheus/prometheus/appstate" "github.com/prometheus/prometheus/appstate"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
// "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/retrieval/format" "github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
@ -40,7 +39,6 @@ var (
scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.")
ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.") ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.")
concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.")
memoryArena = flag.Bool("experimental.useMemoryArena", false, "Use in-memory timeseries arena.")
) )
func main() { func main() {
@ -50,33 +48,16 @@ func main() {
log.Fatalf("Error loading configuration from %s: %v", *configFile, err) log.Fatalf("Error loading configuration from %s: %v", *configFile, err)
} }
var ( ts := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
persistence metric.MetricPersistence
ts metric.Storage
)
if *memoryArena {
persistence = metric.NewMemorySeriesStorage()
} else {
ts = metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
go ts.Serve() go ts.Serve()
// persistence, err = metric.NewLevelDBMetricPersistence(*metricsStoragePath)
// if err != nil {
// log.Fatalf("Error opening storage: %v", err)
// }
}
go func() { go func() {
notifier := make(chan os.Signal) notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt) signal.Notify(notifier, os.Interrupt)
<-notifier <-notifier
// persistence.Close() ts.Close()
os.Exit(0) os.Exit(0)
}() }()
// defer persistence.Close()
// Queue depth will need to be exposed // Queue depth will need to be exposed
scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity)
@ -85,7 +66,8 @@ func main() {
ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity)
ast.SetPersistence(persistence, nil) ast.SetStorage(ts)
ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval) ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval)
err = ruleManager.AddRulesFromConfig(conf) err = ruleManager.AddRulesFromConfig(conf)
if err != nil { if err != nil {
@ -94,45 +76,22 @@ func main() {
appState := &appstate.ApplicationState{ appState := &appstate.ApplicationState{
Config: conf, Config: conf,
Persistence: persistence,
RuleManager: ruleManager, RuleManager: ruleManager,
Storage: ts,
TargetManager: targetManager, TargetManager: targetManager,
} }
web.StartServing(appState) web.StartServing(appState)
// go func() {
// ticker := time.Tick(time.Second)
// for i := 0; i < 120; i++ {
// <-ticker
// if i%10 == 0 {
// fmt.Printf(".")
// }
// }
// fmt.Println()
// //f := model.NewFingerprintFromRowKey("9776005627788788740-g-131-0")
// f := model.NewFingerprintFromRowKey("09923616460706181007-g-131-0")
// v := metric.NewViewRequestBuilder()
// v.GetMetricAtTime(f, time.Now().Add(-120*time.Second))
// view, err := ts.MakeView(v, time.Minute)
// fmt.Println(view, err)
// }()
for { for {
select { select {
case scrapeResult := <-scrapeResults: case scrapeResult := <-scrapeResults:
if scrapeResult.Err == nil { if scrapeResult.Err == nil {
// f := model.NewFingerprintFromMetric(scrapeResult.Sample.Metric)
// fmt.Println(f)
// persistence.AppendSample(scrapeResult.Sample)
ts.AppendSample(scrapeResult.Sample) ts.AppendSample(scrapeResult.Sample)
} }
case ruleResult := <-ruleResults: case ruleResult := <-ruleResults:
for _, sample := range ruleResult.Samples { for _, sample := range ruleResult.Samples {
// XXX: Wart
// persistence.AppendSample(*sample)
ts.AppendSample(*sample) ts.AppendSample(*sample)
} }
} }

View file

@ -90,29 +90,30 @@ const (
type Node interface { type Node interface {
Type() ExprType Type() ExprType
NodeTreeToDotGraph() string NodeTreeToDotGraph() string
Children() []Node
} }
// All node types implement one of the following interfaces. The name of the // All node types implement one of the following interfaces. The name of the
// interface represents the type returned to the parent node. // interface represents the type returned to the parent node.
type ScalarNode interface { type ScalarNode interface {
Node Node
Eval(timestamp *time.Time) model.SampleValue Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue
} }
type VectorNode interface { type VectorNode interface {
Node Node
Eval(timestamp *time.Time) Vector Eval(timestamp *time.Time, view *viewAdapter) Vector
} }
type MatrixNode interface { type MatrixNode interface {
Node Node
Eval(timestamp *time.Time) Matrix Eval(timestamp *time.Time, view *viewAdapter) Matrix
EvalBoundaries(timestamp *time.Time) Matrix EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix
} }
type StringNode interface { type StringNode interface {
Node Node
Eval(timestamp *time.Time) string Eval(timestamp *time.Time, view *viewAdapter) string
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -198,6 +199,7 @@ type (
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Implementations. // Implementations.
// Node.Type() methods.
func (node ScalarLiteral) Type() ExprType { return SCALAR } func (node ScalarLiteral) Type() ExprType { return SCALAR }
func (node ScalarFunctionCall) Type() ExprType { return SCALAR } func (node ScalarFunctionCall) Type() ExprType { return SCALAR }
func (node ScalarArithExpr) Type() ExprType { return SCALAR } func (node ScalarArithExpr) Type() ExprType { return SCALAR }
@ -209,18 +211,30 @@ func (node MatrixLiteral) Type() ExprType { return MATRIX }
func (node StringLiteral) Type() ExprType { return STRING } func (node StringLiteral) Type() ExprType { return STRING }
func (node StringFunctionCall) Type() ExprType { return STRING } func (node StringFunctionCall) Type() ExprType { return STRING }
func (node *ScalarLiteral) Eval(timestamp *time.Time) model.SampleValue { // Node.Children() methods.
func (node ScalarLiteral) Children() []Node { return []Node{} }
func (node ScalarFunctionCall) Children() []Node { return node.args }
func (node ScalarArithExpr) Children() []Node { return []Node{node.lhs, node.rhs} }
func (node VectorLiteral) Children() []Node { return []Node{} }
func (node VectorFunctionCall) Children() []Node { return node.args }
func (node VectorAggregation) Children() []Node { return []Node{node.vector} }
func (node VectorArithExpr) Children() []Node { return []Node{node.lhs, node.rhs} }
func (node MatrixLiteral) Children() []Node { return []Node{} }
func (node StringLiteral) Children() []Node { return []Node{} }
func (node StringFunctionCall) Children() []Node { return node.args }
func (node *ScalarLiteral) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue {
return node.value return node.value
} }
func (node *ScalarArithExpr) Eval(timestamp *time.Time) model.SampleValue { func (node *ScalarArithExpr) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue {
lhs := node.lhs.Eval(timestamp) lhs := node.lhs.Eval(timestamp, view)
rhs := node.rhs.Eval(timestamp) rhs := node.rhs.Eval(timestamp, view)
return evalScalarBinop(node.opType, lhs, rhs) return evalScalarBinop(node.opType, lhs, rhs)
} }
func (node *ScalarFunctionCall) Eval(timestamp *time.Time) model.SampleValue { func (node *ScalarFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue {
return node.function.callFn(timestamp, node.args).(model.SampleValue) return node.function.callFn(timestamp, view, node.args).(model.SampleValue)
} }
func (node *VectorAggregation) labelsToGroupingKey(labels model.Metric) string { func (node *VectorAggregation) labelsToGroupingKey(labels model.Metric) string {
@ -240,11 +254,25 @@ func labelsToKey(labels model.Metric) string {
return strings.Join(keyParts, ",") // TODO not safe when label value contains comma. return strings.Join(keyParts, ",") // TODO not safe when label value contains comma.
} }
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, step time.Duration) Matrix { func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector) {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp)
if err != nil {
// TODO: propagate errors.
return
}
return node.Eval(&timestamp, viewAdapter)
}
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) {
viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval)
if err != nil {
// TODO: propagate errors.
return
}
// TODO implement watchdog timer for long-running queries. // TODO implement watchdog timer for long-running queries.
sampleSets := map[string]*model.SampleSet{} sampleSets := map[string]*model.SampleSet{}
for t := start; t.Before(end); t = t.Add(step) { for t := start; t.Before(end); t = t.Add(interval) {
vector := node.Eval(&t) vector := node.Eval(&t, viewAdapter)
for _, sample := range vector { for _, sample := range vector {
samplePair := model.SamplePair{ samplePair := model.SamplePair{
Value: sample.Value, Value: sample.Value,
@ -262,11 +290,10 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, step time.
} }
} }
matrix := Matrix{}
for _, sampleSet := range sampleSets { for _, sampleSet := range sampleSets {
matrix = append(matrix, sampleSet) matrix = append(matrix, sampleSet)
} }
return matrix return
} }
func labelIntersection(metric1, metric2 model.Metric) model.Metric { func labelIntersection(metric1, metric2 model.Metric) model.Metric {
@ -295,8 +322,8 @@ func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[stri
return vector return vector
} }
func (node *VectorAggregation) Eval(timestamp *time.Time) Vector { func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vector {
vector := node.vector.Eval(timestamp) vector := node.vector.Eval(timestamp, view)
result := map[string]*groupedAggregation{} result := map[string]*groupedAggregation{}
for _, sample := range vector { for _, sample := range vector {
groupingKey := node.labelsToGroupingKey(sample.Metric) groupingKey := node.labelsToGroupingKey(sample.Metric)
@ -328,8 +355,8 @@ func (node *VectorAggregation) Eval(timestamp *time.Time) Vector {
return node.groupedAggregationsToVector(result, timestamp) return node.groupedAggregationsToVector(result, timestamp)
} }
func (node *VectorLiteral) Eval(timestamp *time.Time) Vector { func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector {
values, err := persistenceAdapter.GetValueAtTime(node.labels, timestamp) values, err := view.GetValueAtTime(node.labels, timestamp)
if err != nil { if err != nil {
log.Printf("Unable to get vector values") log.Printf("Unable to get vector values")
return Vector{} return Vector{}
@ -337,8 +364,8 @@ func (node *VectorLiteral) Eval(timestamp *time.Time) Vector {
return values return values
} }
func (node *VectorFunctionCall) Eval(timestamp *time.Time) Vector { func (node *VectorFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) Vector {
return node.function.callFn(timestamp, node.args).(Vector) return node.function.callFn(timestamp, view, node.args).(Vector)
} }
func evalScalarBinop(opType BinOpType, func evalScalarBinop(opType BinOpType,
@ -481,11 +508,11 @@ func labelsEqual(labels1, labels2 model.Metric) bool {
return true return true
} }
func (node *VectorArithExpr) Eval(timestamp *time.Time) Vector { func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vector {
lhs := node.lhs.Eval(timestamp) lhs := node.lhs.Eval(timestamp, view)
result := Vector{} result := Vector{}
if node.rhs.Type() == SCALAR { if node.rhs.Type() == SCALAR {
rhs := node.rhs.(ScalarNode).Eval(timestamp) rhs := node.rhs.(ScalarNode).Eval(timestamp, view)
for _, lhsSample := range lhs { for _, lhsSample := range lhs {
value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhs) value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhs)
if keep { if keep {
@ -495,7 +522,7 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time) Vector {
} }
return result return result
} else if node.rhs.Type() == VECTOR { } else if node.rhs.Type() == VECTOR {
rhs := node.rhs.(VectorNode).Eval(timestamp) rhs := node.rhs.(VectorNode).Eval(timestamp, view)
for _, lhsSample := range lhs { for _, lhsSample := range lhs {
for _, rhsSample := range rhs { for _, rhsSample := range rhs {
if labelsEqual(lhsSample.Metric, rhsSample.Metric) { if labelsEqual(lhsSample.Metric, rhsSample.Metric) {
@ -512,12 +539,12 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time) Vector {
panic("Invalid vector arithmetic expression operands") panic("Invalid vector arithmetic expression operands")
} }
func (node *MatrixLiteral) Eval(timestamp *time.Time) Matrix { func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix {
interval := &model.Interval{ interval := &model.Interval{
OldestInclusive: timestamp.Add(-node.interval), OldestInclusive: timestamp.Add(-node.interval),
NewestInclusive: *timestamp, NewestInclusive: *timestamp,
} }
values, err := persistenceAdapter.GetRangeValues(node.labels, interval) values, err := view.GetRangeValues(node.labels, interval)
if err != nil { if err != nil {
log.Printf("Unable to get values for vector interval") log.Printf("Unable to get values for vector interval")
return Matrix{} return Matrix{}
@ -525,12 +552,12 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time) Matrix {
return values return values
} }
func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time) Matrix { func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix {
interval := &model.Interval{ interval := &model.Interval{
OldestInclusive: timestamp.Add(-node.interval), OldestInclusive: timestamp.Add(-node.interval),
NewestInclusive: *timestamp, NewestInclusive: *timestamp,
} }
values, err := persistenceAdapter.GetBoundaryValues(node.labels, interval) values, err := view.GetBoundaryValues(node.labels, interval)
if err != nil { if err != nil {
log.Printf("Unable to get boundary values for vector interval") log.Printf("Unable to get boundary values for vector interval")
return Matrix{} return Matrix{}
@ -550,12 +577,12 @@ func (matrix Matrix) Swap(i, j int) {
matrix[i], matrix[j] = matrix[j], matrix[i] matrix[i], matrix[j] = matrix[j], matrix[i]
} }
func (node *StringLiteral) Eval(timestamp *time.Time) string { func (node *StringLiteral) Eval(timestamp *time.Time, view *viewAdapter) string {
return node.str return node.str
} }
func (node *StringFunctionCall) Eval(timestamp *time.Time) string { func (node *StringFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) string {
return node.function.callFn(timestamp, node.args).(string) return node.function.callFn(timestamp, view, node.args).(string)
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------

View file

@ -24,7 +24,7 @@ type Function struct {
name string name string
argTypes []ExprType argTypes []ExprType
returnType ExprType returnType ExprType
callFn func(timestamp *time.Time, args []Node) interface{} callFn func(timestamp *time.Time, view *viewAdapter, args []Node) interface{}
} }
func (function *Function) CheckArgTypes(args []Node) error { func (function *Function) CheckArgTypes(args []Node) error {
@ -63,19 +63,19 @@ func (function *Function) CheckArgTypes(args []Node) error {
} }
// === time() === // === time() ===
func timeImpl(timestamp *time.Time, args []Node) interface{} { func timeImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
return model.SampleValue(time.Now().Unix()) return model.SampleValue(time.Now().Unix())
} }
// === count(vector VectorNode) === // === count(vector VectorNode) ===
func countImpl(timestamp *time.Time, args []Node) interface{} { func countImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp))) return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view)))
} }
// === delta(matrix MatrixNode, isCounter ScalarNode) === // === delta(matrix MatrixNode, isCounter ScalarNode) ===
func deltaImpl(timestamp *time.Time, args []Node) interface{} { func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
matrixNode := args[0].(MatrixNode) matrixNode := args[0].(MatrixNode)
isCounter := int(args[1].(ScalarNode).Eval(timestamp)) isCounter := int(args[1].(ScalarNode).Eval(timestamp, view))
resultVector := Vector{} resultVector := Vector{}
// If we treat these metrics as counters, we need to fetch all values // If we treat these metrics as counters, we need to fetch all values
@ -83,9 +83,9 @@ func deltaImpl(timestamp *time.Time, args []Node) interface{} {
// I.e. if a counter resets, we want to ignore that reset. // I.e. if a counter resets, we want to ignore that reset.
var matrixValue Matrix var matrixValue Matrix
if isCounter > 0 { if isCounter > 0 {
matrixValue = matrixNode.Eval(timestamp) matrixValue = matrixNode.Eval(timestamp, view)
} else { } else {
matrixValue = matrixNode.EvalBoundaries(timestamp) matrixValue = matrixNode.EvalBoundaries(timestamp, view)
} }
for _, samples := range matrixValue { for _, samples := range matrixValue {
counterCorrection := model.SampleValue(0) counterCorrection := model.SampleValue(0)
@ -109,9 +109,9 @@ func deltaImpl(timestamp *time.Time, args []Node) interface{} {
} }
// === rate(node *MatrixNode) === // === rate(node *MatrixNode) ===
func rateImpl(timestamp *time.Time, args []Node) interface{} { func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
args = append(args, &ScalarLiteral{value: 1}) args = append(args, &ScalarLiteral{value: 1})
vector := deltaImpl(timestamp, args).(Vector) vector := deltaImpl(timestamp, view, args).(Vector)
// TODO: could be other type of MatrixNode in the future (right now, only // TODO: could be other type of MatrixNode in the future (right now, only
// MatrixLiteral exists). Find a better way of getting the duration of a // MatrixLiteral exists). Find a better way of getting the duration of a
@ -124,7 +124,7 @@ func rateImpl(timestamp *time.Time, args []Node) interface{} {
} }
// === sampleVectorImpl() === // === sampleVectorImpl() ===
func sampleVectorImpl(timestamp *time.Time, args []Node) interface{} { func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
return Vector{ return Vector{
&model.Sample{ &model.Sample{
Metric: model.Metric{ Metric: model.Metric{

View file

@ -22,85 +22,131 @@ import (
var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.") var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.")
type PersistenceAdapter struct { // AST-global storage to use for operations that are not supported by views
persistence metric.MetricPersistence // (i.e. metric->fingerprint lookups).
var queryStorage metric.Storage = nil
type viewAdapter struct {
view metric.View
// TODO: use this.
stalenessPolicy *metric.StalenessPolicy stalenessPolicy *metric.StalenessPolicy
} }
// AST-global persistence to use. func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) {
var persistenceAdapter *PersistenceAdapter = nil var minDelta time.Duration
for _, candidate := range samples {
func (p *PersistenceAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) { // Ignore samples outside of staleness policy window.
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels) delta := candidate.Timestamp.Sub(*timestamp)
if err != nil { if delta < 0 {
return delta = -delta
} }
if delta > v.stalenessPolicy.DeltaAllowance {
for _, fingerprint := range fingerprints {
var sample *model.Sample // Don't shadow err.
sample, err = p.persistence.GetValueAtTime(fingerprint, *timestamp, *p.stalenessPolicy)
if err != nil {
return
}
if sample == nil {
continue continue
} }
samples = append(samples, sample)
// Skip sample if we've seen a closer one before this.
if sample != nil {
if delta > minDelta {
continue
}
}
sample = &candidate
minDelta = delta
} }
return return
} }
func (p *PersistenceAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) {
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels) fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels)
if err != nil {
return
}
for _, fingerprint := range fingerprints {
sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp)
samplePair := v.chooseClosestSample(sampleCandidates, timestamp)
m, err := queryStorage.GetMetricForFingerprint(fingerprint)
if err != nil {
continue
}
if samplePair != nil {
samples = append(samples, &model.Sample{
Metric: *m,
Value: samplePair.Value,
Timestamp: *timestamp,
})
}
}
return
}
func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) {
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels)
if err != nil { if err != nil {
return return
} }
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
var sampleSet *model.SampleSet // Don't shadow err.
// TODO: change to GetBoundaryValues() once it has the right return type. // TODO: change to GetBoundaryValues() once it has the right return type.
sampleSet, err = p.persistence.GetRangeValues(fingerprint, *interval) samplePairs := v.view.GetRangeValues(fingerprint, *interval)
if err != nil { if samplePairs == nil {
return nil, err
}
if sampleSet == nil {
continue continue
} }
// TODO: memoize/cache this.
m, err := queryStorage.GetMetricForFingerprint(fingerprint)
if err != nil {
continue
}
sampleSet := &model.SampleSet{
Metric: *m,
Values: samplePairs,
}
sampleSets = append(sampleSets, sampleSet) sampleSets = append(sampleSets, sampleSet)
} }
return sampleSets, nil return sampleSets, nil
} }
func (p *PersistenceAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) {
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels) fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels)
if err != nil { if err != nil {
return return
} }
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
var sampleSet *model.SampleSet // Don't shadow err. samplePairs := v.view.GetRangeValues(fingerprint, *interval)
sampleSet, err = p.persistence.GetRangeValues(fingerprint, *interval) if samplePairs == nil {
if err != nil {
return nil, err
}
if sampleSet == nil {
continue continue
} }
// TODO: memoize/cache this.
m, err := queryStorage.GetMetricForFingerprint(fingerprint)
if err != nil {
continue
}
sampleSet := &model.SampleSet{
Metric: *m,
Values: samplePairs,
}
sampleSets = append(sampleSets, sampleSet) sampleSets = append(sampleSets, sampleSet)
} }
return sampleSets, nil return sampleSets, nil
} }
func SetPersistence(persistence metric.MetricPersistence, policy *metric.StalenessPolicy) { func SetStorage(storage metric.Storage) {
if policy == nil { queryStorage = storage
policy = &metric.StalenessPolicy{ }
func NewViewAdapter(view metric.View) *viewAdapter {
stalenessPolicy := metric.StalenessPolicy{
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
} }
}
persistenceAdapter = &PersistenceAdapter{ return &viewAdapter{
persistence: persistence, view: view,
stalenessPolicy: policy, stalenessPolicy: &stalenessPolicy,
} }
} }

View file

@ -152,9 +152,14 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
} }
func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
viewAdapter, err := viewAdapterForInstantQuery(node, *timestamp)
if err != nil {
panic(err)
}
switch node.Type() { switch node.Type() {
case SCALAR: case SCALAR:
scalar := node.(ScalarNode).Eval(timestamp) scalar := node.(ScalarNode).Eval(timestamp, viewAdapter)
switch format { switch format {
case TEXT: case TEXT:
return fmt.Sprintf("scalar: %v", scalar) return fmt.Sprintf("scalar: %v", scalar)
@ -162,7 +167,7 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
return TypedValueToJSON(scalar, "scalar") return TypedValueToJSON(scalar, "scalar")
} }
case VECTOR: case VECTOR:
vector := node.(VectorNode).Eval(timestamp) vector := node.(VectorNode).Eval(timestamp, viewAdapter)
switch format { switch format {
case TEXT: case TEXT:
return vector.String() return vector.String()
@ -170,7 +175,7 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
return TypedValueToJSON(vector, "vector") return TypedValueToJSON(vector, "vector")
} }
case MATRIX: case MATRIX:
matrix := node.(MatrixNode).Eval(timestamp) matrix := node.(MatrixNode).Eval(timestamp, viewAdapter)
switch format { switch format {
case TEXT: case TEXT:
return matrix.String() return matrix.String()
@ -178,7 +183,7 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
return TypedValueToJSON(matrix, "matrix") return TypedValueToJSON(matrix, "matrix")
} }
case STRING: case STRING:
str := node.(StringNode).Eval(timestamp) str := node.(StringNode).Eval(timestamp, viewAdapter)
switch format { switch format {
case TEXT: case TEXT:
return str return str

141
rules/ast/query_analyzer.go Normal file
View file

@ -0,0 +1,141 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ast
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/metric"
"log"
"time"
)
type FullRangeMap map[model.Fingerprint]time.Duration
type IntervalRangeMap map[model.Fingerprint]bool
type QueryAnalyzer struct {
// Values collected by query analysis.
//
// Full ranges always implicitly span a time range of:
// - start: query interval start - duration
// - end: query interval end
//
// This is because full ranges can only result from matrix literals (like
// "foo[5m]"), which have said time-spanning behavior during a ranged query.
FullRanges FullRangeMap
// Interval ranges always implicitly span the whole query interval.
IntervalRanges IntervalRangeMap
}
func NewQueryAnalyzer() *QueryAnalyzer {
return &QueryAnalyzer{
FullRanges: FullRangeMap{},
IntervalRanges: IntervalRangeMap{},
}
}
func minTime(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t1
}
return t2
}
func maxTime(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}
func (analyzer *QueryAnalyzer) Visit(node Node) {
switch n := node.(type) {
case *VectorLiteral:
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels)
if err != nil {
log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err)
return
}
for _, fingerprint := range fingerprints {
if !analyzer.IntervalRanges[fingerprint] {
analyzer.IntervalRanges[fingerprint] = true
}
}
case *MatrixLiteral:
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels)
if err != nil {
log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err)
return
}
for _, fingerprint := range fingerprints {
interval := n.interval
// If an interval has already been recorded for this fingerprint, merge
// it with the current interval.
if oldInterval, ok := analyzer.FullRanges[fingerprint]; ok {
if oldInterval > interval {
interval = oldInterval
}
}
analyzer.FullRanges[fingerprint] = interval
}
}
}
func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
Walk(analyzer, node)
// Find and dedupe overlaps between full and stepped ranges. Full ranges
// always contain more points *and* span more time than stepped ranges, so
// throw away stepped ranges for fingerprints which have full ranges.
for fingerprint := range analyzer.FullRanges {
if analyzer.IntervalRanges[fingerprint] {
delete(analyzer.IntervalRanges, fingerprint)
}
}
}
func viewAdapterForInstantQuery(node Node, timestamp time.Time) (viewAdapter *viewAdapter, err error) {
analyzer := NewQueryAnalyzer()
analyzer.AnalyzeQueries(node)
viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
viewBuilder.GetMetricRange(fingerprint, timestamp.Add(-rangeDuration), timestamp)
}
for fingerprint := range analyzer.IntervalRanges {
viewBuilder.GetMetricAtTime(fingerprint, timestamp)
}
view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second)
if err == nil {
viewAdapter = NewViewAdapter(view)
}
return
}
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration) (viewAdapter *viewAdapter, err error) {
analyzer := NewQueryAnalyzer()
analyzer.AnalyzeQueries(node)
viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
// TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder.
for t := start; t.Before(end); t = t.Add(interval) {
viewBuilder.GetMetricRange(fingerprint, t.Add(-rangeDuration), t)
}
}
for fingerprint := range analyzer.IntervalRanges {
viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval)
}
view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second)
if err == nil {
viewAdapter = NewViewAdapter(view)
}
return
}

27
rules/ast/walk.go Normal file
View file

@ -0,0 +1,27 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ast
type Visitor interface {
Visit(node Node)
}
// Walk() does a depth-first traversal of the AST, calling visitor.Visit() for
// each encountered node in the tree.
func Walk(visitor Visitor, node Node) {
visitor.Visit(node)
for _, childNode := range node.Children() {
Walk(visitor, childNode)
}
}

View file

@ -30,8 +30,8 @@ type Rule struct {
func (rule *Rule) Name() string { return rule.name } func (rule *Rule) Name() string { return rule.name }
func (rule *Rule) EvalRaw(timestamp *time.Time) ast.Vector { func (rule *Rule) EvalRaw(timestamp *time.Time) (vector ast.Vector) {
return rule.vector.Eval(timestamp) return ast.EvalVectorInstant(rule.vector, *timestamp)
} }
func (rule *Rule) Eval(timestamp *time.Time) ast.Vector { func (rule *Rule) Eval(timestamp *time.Time) ast.Vector {

View file

@ -21,6 +21,7 @@ import (
"os" "os"
"strings" "strings"
"testing" "testing"
"time"
) )
var testEvalTime = testStartTime.Add(testDuration5m * 10) var testEvalTime = testStartTime.Add(testDuration5m * 10)
@ -31,16 +32,22 @@ var expressionTests = []struct {
expr string expr string
output []string output []string
shouldFail bool shouldFail bool
fullRanges int
intervalRanges int
}{ }{
{ {
expr: "SUM(http_requests)", expr: "SUM(http_requests)",
output: []string{"http_requests{} => 3600 @[%v]"}, output: []string{"http_requests{} => 3600 @[%v]"},
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job)", expr: "SUM(http_requests) BY (job)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 1000 @[%v]", "http_requests{job='api-server'} => 1000 @[%v]",
"http_requests{job='app-server'} => 2600 @[%v]", "http_requests{job='app-server'} => 2600 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job, group)", expr: "SUM(http_requests) BY (job, group)",
output: []string{ output: []string{
@ -49,74 +56,116 @@ var expressionTests = []struct {
"http_requests{group='production',job='api-server'} => 300 @[%v]", "http_requests{group='production',job='api-server'} => 300 @[%v]",
"http_requests{group='production',job='app-server'} => 1100 @[%v]", "http_requests{group='production',job='app-server'} => 1100 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "AVG(http_requests) BY (job)", expr: "AVG(http_requests) BY (job)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 250 @[%v]", "http_requests{job='api-server'} => 250 @[%v]",
"http_requests{job='app-server'} => 650 @[%v]", "http_requests{job='app-server'} => 650 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "MIN(http_requests) BY (job)", expr: "MIN(http_requests) BY (job)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 100 @[%v]", "http_requests{job='api-server'} => 100 @[%v]",
"http_requests{job='app-server'} => 500 @[%v]", "http_requests{job='app-server'} => 500 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "MAX(http_requests) BY (job)", expr: "MAX(http_requests) BY (job)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 400 @[%v]", "http_requests{job='api-server'} => 400 @[%v]",
"http_requests{job='app-server'} => 800 @[%v]", "http_requests{job='app-server'} => 800 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) - count(http_requests)", expr: "SUM(http_requests) BY (job) - count(http_requests)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 992 @[%v]", "http_requests{job='api-server'} => 992 @[%v]",
"http_requests{job='app-server'} => 2592 @[%v]", "http_requests{job='app-server'} => 2592 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) - 2", expr: "SUM(http_requests) BY (job) - 2",
output: []string{ output: []string{
"http_requests{job='api-server'} => 998 @[%v]", "http_requests{job='api-server'} => 998 @[%v]",
"http_requests{job='app-server'} => 2598 @[%v]", "http_requests{job='app-server'} => 2598 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) % 3", expr: "SUM(http_requests) BY (job) % 3",
output: []string{ output: []string{
"http_requests{job='api-server'} => 1 @[%v]", "http_requests{job='api-server'} => 1 @[%v]",
"http_requests{job='app-server'} => 2 @[%v]", "http_requests{job='app-server'} => 2 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) / 0", expr: "SUM(http_requests) BY (job) / 0",
output: []string{ output: []string{
"http_requests{job='api-server'} => +Inf @[%v]", "http_requests{job='api-server'} => +Inf @[%v]",
"http_requests{job='app-server'} => +Inf @[%v]", "http_requests{job='app-server'} => +Inf @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) > 1000", expr: "SUM(http_requests) BY (job) > 1000",
output: []string{ output: []string{
"http_requests{job='app-server'} => 2600 @[%v]", "http_requests{job='app-server'} => 2600 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) <= 1000", expr: "SUM(http_requests) BY (job) <= 1000",
output: []string{ output: []string{
"http_requests{job='api-server'} => 1000 @[%v]", "http_requests{job='api-server'} => 1000 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) != 1000", expr: "SUM(http_requests) BY (job) != 1000",
output: []string{ output: []string{
"http_requests{job='app-server'} => 2600 @[%v]", "http_requests{job='app-server'} => 2600 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) == 1000", expr: "SUM(http_requests) BY (job) == 1000",
output: []string{ output: []string{
"http_requests{job='api-server'} => 1000 @[%v]", "http_requests{job='api-server'} => 1000 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) + SUM(http_requests) BY (job)", expr: "SUM(http_requests) BY (job) + SUM(http_requests) BY (job)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 2000 @[%v]", "http_requests{job='api-server'} => 2000 @[%v]",
"http_requests{job='app-server'} => 5200 @[%v]", "http_requests{job='app-server'} => 5200 @[%v]",
}, },
fullRanges: 0,
intervalRanges: 8,
}, {
expr: "http_requests{job='api-server', group='canary'}",
output: []string{
"http_requests{group='canary',instance='0',job='api-server'} => 300 @[%v]",
"http_requests{group='canary',instance='1',job='api-server'} => 400 @[%v]",
},
fullRanges: 0,
intervalRanges: 2,
}, {
expr: "http_requests{job='api-server', group='canary'} + delta(http_requests{job='api-server'}[5m], 1)",
output: []string{
"http_requests{group='canary',instance='0',job='api-server'} => 330 @[%v]",
"http_requests{group='canary',instance='1',job='api-server'} => 440 @[%v]",
},
fullRanges: 4,
intervalRanges: 0,
}, { }, {
expr: "delta(http_requests[25m], 1)", expr: "delta(http_requests[25m], 1)",
output: []string{ output: []string{
@ -129,6 +178,8 @@ var expressionTests = []struct {
"http_requests{group='production',instance='1',job='api-server'} => 100 @[%v]", "http_requests{group='production',instance='1',job='api-server'} => 100 @[%v]",
"http_requests{group='production',instance='1',job='app-server'} => 300 @[%v]", "http_requests{group='production',instance='1',job='app-server'} => 300 @[%v]",
}, },
fullRanges: 8,
intervalRanges: 0,
// Invalid expressions that should fail to parse. // Invalid expressions that should fail to parse.
}, { }, {
expr: "", expr: "",
@ -162,31 +213,24 @@ func vectorComparisonString(expected []string, actual []string) string {
} }
func TestExpressions(t *testing.T) { func TestExpressions(t *testing.T) {
temporaryDirectory, err := ioutil.TempDir("", "leveldb_metric_persistence_test") temporaryDirectory, err := ioutil.TempDir("", "rule_expression_tests")
if err != nil { if err != nil {
t.Errorf("Could not create temporary directory: %q\n", err) t.Errorf("Could not create temporary directory: %q\n", err)
return return
} }
tieredStorage := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, temporaryDirectory)
go tieredStorage.Serve()
defer func() { defer func() {
tieredStorage.Close()
if err = os.RemoveAll(temporaryDirectory); err != nil { if err = os.RemoveAll(temporaryDirectory); err != nil {
t.Errorf("Could not remove temporary directory: %q\n", err) t.Errorf("Could not remove temporary directory: %q\n", err)
} }
}() }()
persistence, err := metric.NewLevelDBMetricPersistence(temporaryDirectory)
if err != nil {
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err)
return
}
if persistence == nil {
t.Errorf("Received nil LevelDB Metric Persistence.\n")
return
}
defer func() {
persistence.Close()
}()
storeMatrix(persistence, testMatrix) ast.SetStorage(tieredStorage)
ast.SetPersistence(persistence, nil)
storeMatrix(tieredStorage, testMatrix)
tieredStorage.Flush()
for _, exprTest := range expressionTests { for _, exprTest := range expressionTests {
expectedLines := annotateWithTime(exprTest.output) expectedLines := annotateWithTime(exprTest.output)
@ -200,6 +244,9 @@ func TestExpressions(t *testing.T) {
t.Errorf("Error during parsing: %v", err) t.Errorf("Error during parsing: %v", err)
t.Errorf("Expression: %v", exprTest.expr) t.Errorf("Expression: %v", exprTest.expr)
} else { } else {
if exprTest.shouldFail {
t.Errorf("Test should fail, but didn't")
}
failed := false failed := false
resultStr := ast.EvalToString(testExpr, &testEvalTime, ast.TEXT) resultStr := ast.EvalToString(testExpr, &testEvalTime, ast.TEXT)
resultLines := strings.Split(resultStr, "\n") resultLines := strings.Split(resultStr, "\n")
@ -221,6 +268,18 @@ func TestExpressions(t *testing.T) {
failed = true failed = true
} }
} }
analyzer := ast.NewQueryAnalyzer()
analyzer.AnalyzeQueries(testExpr)
if exprTest.fullRanges != len(analyzer.FullRanges) {
t.Errorf("Count of full ranges didn't match: %v vs %v", exprTest.fullRanges, len(analyzer.FullRanges))
failed = true
}
if exprTest.intervalRanges != len(analyzer.IntervalRanges) {
t.Errorf("Count of stepped ranges didn't match: %v vs %v", exprTest.intervalRanges, len(analyzer.IntervalRanges))
failed = true
}
if failed { if failed {
t.Errorf("Expression: %v\n%v", exprTest.expr, vectorComparisonString(expectedLines, resultLines)) t.Errorf("Expression: %v\n%v", exprTest.expr, vectorComparisonString(expectedLines, resultLines))
} }

View file

@ -51,10 +51,10 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
return vector return vector
} }
func storeMatrix(persistence metric.MetricPersistence, matrix ast.Matrix) error { func storeMatrix(storage metric.Storage, matrix ast.Matrix) error {
for _, sampleSet := range matrix { for _, sampleSet := range matrix {
for _, sample := range sampleSet.Values { for _, sample := range sampleSet.Values {
err := persistence.AppendSample(model.Sample{ err := storage.AppendSample(model.Sample{
Metric: sampleSet.Metric, Metric: sampleSet.Metric,
Value: sample.Value, Value: sample.Value,
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,

View file

@ -72,18 +72,21 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st
// Align the start to step "tick" boundary. // Align the start to step "tick" boundary.
end -= end % step end -= end % step
matrix := ast.EvalVectorRange( matrix, err := ast.EvalVectorRange(
exprNode.(ast.VectorNode), exprNode.(ast.VectorNode),
time.Unix(end-duration, 0), time.Unix(end-duration, 0),
time.Unix(end, 0), time.Unix(end, 0),
time.Duration(step)*time.Second) time.Duration(step)*time.Second)
if err != nil {
return ast.ErrorToJSON(err)
}
sort.Sort(matrix) sort.Sort(matrix)
return ast.TypedValueToJSON(matrix, "matrix") return ast.TypedValueToJSON(matrix, "matrix")
} }
func (serv MetricsService) Metrics() string { func (serv MetricsService) Metrics() string {
metricNames, err := serv.appState.Persistence.GetAllMetricNames() metricNames, err := serv.appState.Storage.GetAllMetricNames()
rb := serv.ResponseBuilder() rb := serv.ResponseBuilder()
rb.SetContentType(gorest.Application_Json) rb.SetContentType(gorest.Application_Json)
if err != nil { if err != nil {