mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Cleanups to rules/persistence adapter code.
This commit is contained in:
parent
619aa97025
commit
c049ae39af
2
main.go
2
main.go
|
@ -65,7 +65,7 @@ func main() {
|
||||||
|
|
||||||
ruleResults := make(chan *rules.Result, 4096)
|
ruleResults := make(chan *rules.Result, 4096)
|
||||||
|
|
||||||
ast.SetPersistence(persistence)
|
ast.SetPersistence(persistence, nil)
|
||||||
ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval)
|
ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval)
|
||||||
err = ruleManager.AddRulesFromConfig(conf)
|
err = ruleManager.AddRulesFromConfig(conf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -316,7 +316,7 @@ func (node *VectorAggregation) Eval(timestamp *time.Time) Vector {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (node *VectorLiteral) Eval(timestamp *time.Time) Vector {
|
func (node *VectorLiteral) Eval(timestamp *time.Time) Vector {
|
||||||
values, err := persistence.GetValueAtTime(node.labels, timestamp, &stalenessPolicy)
|
values, err := persistenceAdapter.GetValueAtTime(node.labels, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to get vector values")
|
log.Printf("Unable to get vector values")
|
||||||
return Vector{}
|
return Vector{}
|
||||||
|
@ -504,7 +504,7 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time) Matrix {
|
||||||
OldestInclusive: timestamp.Add(-node.interval),
|
OldestInclusive: timestamp.Add(-node.interval),
|
||||||
NewestInclusive: *timestamp,
|
NewestInclusive: *timestamp,
|
||||||
}
|
}
|
||||||
values, err := persistence.GetRangeValues(node.labels, interval, &stalenessPolicy)
|
values, err := persistenceAdapter.GetRangeValues(node.labels, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to get values for vector interval")
|
log.Printf("Unable to get values for vector interval")
|
||||||
return Matrix{}
|
return Matrix{}
|
||||||
|
@ -517,7 +517,7 @@ func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time) Matrix {
|
||||||
OldestInclusive: timestamp.Add(-node.interval),
|
OldestInclusive: timestamp.Add(-node.interval),
|
||||||
NewestInclusive: *timestamp,
|
NewestInclusive: *timestamp,
|
||||||
}
|
}
|
||||||
values, err := persistence.GetBoundaryValues(node.labels, interval, &stalenessPolicy)
|
values, err := persistenceAdapter.GetBoundaryValues(node.labels, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to get boundary values for vector interval")
|
log.Printf("Unable to get boundary values for vector interval")
|
||||||
return Matrix{}
|
return Matrix{}
|
||||||
|
|
|
@ -1,32 +1,23 @@
|
||||||
package ast
|
package ast
|
||||||
|
|
||||||
//////////
|
|
||||||
// TEMPORARY CRAP FILE IN LIEU OF MISSING FUNCTIONALITY IN STORAGE LAYER
|
|
||||||
//
|
|
||||||
// REMOVE!
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"github.com/matttproud/prometheus/model"
|
"github.com/matttproud/prometheus/model"
|
||||||
"github.com/matttproud/prometheus/storage/metric"
|
"github.com/matttproud/prometheus/storage/metric"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO ask matt about using pointers in nested metric structs
|
var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.")
|
||||||
|
|
||||||
// TODO move this somewhere proper
|
type PersistenceAdapter struct {
|
||||||
var stalenessPolicy = metric.StalenessPolicy{
|
|
||||||
DeltaAllowance: time.Duration(300) * time.Second,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO remove PersistenceBridge temporary helper.
|
|
||||||
type PersistenceBridge struct {
|
|
||||||
persistence metric.MetricPersistence
|
persistence metric.MetricPersistence
|
||||||
|
stalenessPolicy *metric.StalenessPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
// AST-global persistence to use.
|
// AST-global persistence to use.
|
||||||
var persistence *PersistenceBridge = nil
|
var persistenceAdapter *PersistenceAdapter = nil
|
||||||
|
|
||||||
func (p *PersistenceBridge) getMetricsWithLabels(labels model.LabelSet) ([]*model.Metric, error) {
|
func (p *PersistenceAdapter) getMetricsWithLabels(labels model.LabelSet) ([]*model.Metric, error) {
|
||||||
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(&labels)
|
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(&labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -42,14 +33,14 @@ func (p *PersistenceBridge) getMetricsWithLabels(labels model.LabelSet) ([]*mode
|
||||||
return metrics, nil
|
return metrics, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PersistenceBridge) GetValueAtTime(labels model.LabelSet, timestamp *time.Time, stalenessPolicy *metric.StalenessPolicy) ([]*model.Sample, error) {
|
func (p *PersistenceAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) ([]*model.Sample, error) {
|
||||||
metrics, err := p.getMetricsWithLabels(labels)
|
metrics, err := p.getMetricsWithLabels(labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
samples := []*model.Sample{}
|
samples := []*model.Sample{}
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
sample, err := p.persistence.GetValueAtTime(metric, timestamp, stalenessPolicy)
|
sample, err := p.persistence.GetValueAtTime(metric, timestamp, p.stalenessPolicy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -61,7 +52,7 @@ func (p *PersistenceBridge) GetValueAtTime(labels model.LabelSet, timestamp *tim
|
||||||
return samples, nil
|
return samples, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PersistenceBridge) GetBoundaryValues(labels model.LabelSet, interval *model.Interval, stalenessPolicy *metric.StalenessPolicy) ([]*model.SampleSet, error) {
|
func (p *PersistenceAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) ([]*model.SampleSet, error) {
|
||||||
metrics, err := p.getMetricsWithLabels(labels)
|
metrics, err := p.getMetricsWithLabels(labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -70,7 +61,7 @@ func (p *PersistenceBridge) GetBoundaryValues(labels model.LabelSet, interval *m
|
||||||
sampleSets := []*model.SampleSet{}
|
sampleSets := []*model.SampleSet{}
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
// TODO: change to GetBoundaryValues() once it has the right return type.
|
// TODO: change to GetBoundaryValues() once it has the right return type.
|
||||||
sampleSet, err := p.persistence.GetRangeValues(metric, interval, stalenessPolicy)
|
sampleSet, err := p.persistence.GetRangeValues(metric, interval, p.stalenessPolicy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -85,7 +76,7 @@ func (p *PersistenceBridge) GetBoundaryValues(labels model.LabelSet, interval *m
|
||||||
return sampleSets, nil
|
return sampleSets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PersistenceBridge) GetRangeValues(labels model.LabelSet, interval *model.Interval, stalenessPolicy *metric.StalenessPolicy) ([]*model.SampleSet, error) {
|
func (p *PersistenceAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) ([]*model.SampleSet, error) {
|
||||||
metrics, err := p.getMetricsWithLabels(labels)
|
metrics, err := p.getMetricsWithLabels(labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -93,7 +84,7 @@ func (p *PersistenceBridge) GetRangeValues(labels model.LabelSet, interval *mode
|
||||||
|
|
||||||
sampleSets := []*model.SampleSet{}
|
sampleSets := []*model.SampleSet{}
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
sampleSet, err := p.persistence.GetRangeValues(metric, interval, stalenessPolicy)
|
sampleSet, err := p.persistence.GetRangeValues(metric, interval, p.stalenessPolicy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -108,8 +99,14 @@ func (p *PersistenceBridge) GetRangeValues(labels model.LabelSet, interval *mode
|
||||||
return sampleSets, nil
|
return sampleSets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetPersistence(p metric.MetricPersistence) {
|
func SetPersistence(persistence metric.MetricPersistence, policy *metric.StalenessPolicy) {
|
||||||
persistence = &PersistenceBridge{
|
if policy == nil {
|
||||||
persistence: p,
|
policy = &metric.StalenessPolicy{
|
||||||
|
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
persistenceAdapter = &PersistenceAdapter{
|
||||||
|
persistence: persistence,
|
||||||
|
stalenessPolicy: policy,
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -173,7 +173,7 @@ func TestExpressions(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
storeMatrix(persistence, testMatrix)
|
storeMatrix(persistence, testMatrix)
|
||||||
ast.SetPersistence(persistence)
|
ast.SetPersistence(persistence, nil)
|
||||||
|
|
||||||
for _, exprTest := range expressionTests {
|
for _, exprTest := range expressionTests {
|
||||||
expectedLines := annotateWithTime(exprTest.output)
|
expectedLines := annotateWithTime(exprTest.output)
|
||||||
|
|
Loading…
Reference in a new issue