Merge pull request #420 from prometheus/metric-cow

Introduce copy-on-write for metrics in AST.
This commit is contained in:
Björn Rabenstein 2014-12-16 16:24:27 +01:00
commit d72d49f1b3
13 changed files with 395 additions and 235 deletions

View file

@ -80,7 +80,7 @@ type Alert struct {
} }
// sample returns a Sample suitable for recording the alert. // sample returns a Sample suitable for recording the alert.
func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *clientmodel.Sample { func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *ast.Sample {
recordedMetric := clientmodel.Metric{} recordedMetric := clientmodel.Metric{}
for label, value := range a.Labels { for label, value := range a.Labels {
recordedMetric[label] = value recordedMetric[label] = value
@ -90,8 +90,11 @@ func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleV
recordedMetric[AlertNameLabel] = clientmodel.LabelValue(a.Name) recordedMetric[AlertNameLabel] = clientmodel.LabelValue(a.Name)
recordedMetric[AlertStateLabel] = clientmodel.LabelValue(a.State.String()) recordedMetric[AlertStateLabel] = clientmodel.LabelValue(a.State.String())
return &clientmodel.Sample{ return &ast.Sample{
Metric: recordedMetric, Metric: clientmodel.COWMetric{
Metric: recordedMetric,
Copied: true,
},
Value: value, Value: value,
Timestamp: timestamp, Timestamp: timestamp,
} }
@ -145,18 +148,17 @@ func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage local.St
// or update the expression value for existing elements. // or update the expression value for existing elements.
resultFingerprints := utility.Set{} resultFingerprints := utility.Set{}
for _, sample := range exprResult { for _, sample := range exprResult {
fp := new(clientmodel.Fingerprint) fp := sample.Metric.Metric.Fingerprint()
fp.LoadFromMetric(sample.Metric) resultFingerprints.Add(fp)
resultFingerprints.Add(*fp)
if alert, ok := rule.activeAlerts[*fp]; !ok { if alert, ok := rule.activeAlerts[fp]; !ok {
labels := clientmodel.LabelSet{} labels := clientmodel.LabelSet{}
labels.MergeFromMetric(sample.Metric) labels.MergeFromMetric(sample.Metric.Metric)
labels = labels.Merge(rule.Labels) labels = labels.Merge(rule.Labels)
if _, ok := labels[clientmodel.MetricNameLabel]; ok { if _, ok := labels[clientmodel.MetricNameLabel]; ok {
delete(labels, clientmodel.MetricNameLabel) delete(labels, clientmodel.MetricNameLabel)
} }
rule.activeAlerts[*fp] = &Alert{ rule.activeAlerts[fp] = &Alert{
Name: rule.name, Name: rule.name,
Labels: labels, Labels: labels,
State: Pending, State: Pending,

View file

@ -34,17 +34,30 @@ var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "St
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Raw data value types. // Raw data value types.
// SampleStream is a stream of Values belonging to an attached COWMetric.
type SampleStream struct {
Metric clientmodel.COWMetric
Values metric.Values
}
// Sample is a single sample belonging to a COWMetric.
type Sample struct {
Metric clientmodel.COWMetric
Value clientmodel.SampleValue
Timestamp clientmodel.Timestamp
}
// Vector is basically only an alias for clientmodel.Samples, but the // Vector is basically only an alias for clientmodel.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp. // contract is that in a Vector, all Samples have the same timestamp.
type Vector clientmodel.Samples type Vector []*Sample
// Matrix is a slice of SampleSets that implements sort.Interface and // Matrix is a slice of SampleStreams that implements sort.Interface and
// has a String method. // has a String method.
// BUG(julius): Pointerize this. // BUG(julius): Pointerize this.
type Matrix []metric.SampleSet type Matrix []SampleStream
type groupedAggregation struct { type groupedAggregation struct {
labels clientmodel.Metric labels clientmodel.COWMetric
value clientmodel.SampleValue value clientmodel.SampleValue
groupCount int groupCount int
} }
@ -191,7 +204,7 @@ type (
labelMatchers metric.LabelMatchers labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time. // The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.Metric metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time. // Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints fingerprints clientmodel.Fingerprints
} }
@ -231,7 +244,7 @@ type (
labelMatchers metric.LabelMatchers labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time. // The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.Metric metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time. // Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints fingerprints clientmodel.Fingerprints
interval time.Duration interval time.Duration
@ -400,44 +413,43 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod
// TODO implement watchdog timer for long-running queries. // TODO implement watchdog timer for long-running queries.
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start() evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
sampleSets := map[uint64]*metric.SampleSet{} sampleStreams := map[uint64]*SampleStream{}
for t := start; t.Before(end); t = t.Add(interval) { for t := start; !t.After(end); t = t.Add(interval) {
vector := node.Eval(t) vector := node.Eval(t)
for _, sample := range vector { for _, sample := range vector {
samplePair := metric.SamplePair{ samplePair := metric.SamplePair{
Value: sample.Value, Value: sample.Value,
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
} }
groupingKey := labelsToKey(sample.Metric) groupingKey := labelsToKey(sample.Metric.Metric)
if sampleSets[groupingKey] == nil { if sampleStreams[groupingKey] == nil {
sampleSets[groupingKey] = &metric.SampleSet{ sampleStreams[groupingKey] = &SampleStream{
Metric: sample.Metric, Metric: sample.Metric,
Values: metric.Values{samplePair}, Values: metric.Values{samplePair},
} }
} else { } else {
sampleSets[groupingKey].Values = append(sampleSets[groupingKey].Values, samplePair) sampleStreams[groupingKey].Values = append(sampleStreams[groupingKey].Values, samplePair)
} }
} }
} }
evalTimer.Stop() evalTimer.Stop()
appendTimer := queryStats.GetTimer(stats.ResultAppendTime).Start() appendTimer := queryStats.GetTimer(stats.ResultAppendTime).Start()
for _, sampleSet := range sampleSets { for _, sampleStream := range sampleStreams {
matrix = append(matrix, *sampleSet) matrix = append(matrix, *sampleStream)
} }
appendTimer.Stop() appendTimer.Stop()
return matrix, nil return matrix, nil
} }
func labelIntersection(metric1, metric2 clientmodel.Metric) clientmodel.Metric { func labelIntersection(metric1, metric2 clientmodel.COWMetric) clientmodel.COWMetric {
intersection := clientmodel.Metric{} for label, value := range metric1.Metric {
for label, value := range metric1 { if metric2.Metric[label] != value {
if metric2[label] == value { metric1.Delete(label)
intersection[label] = value
} }
} }
return intersection return metric1
} }
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp clientmodel.Timestamp) Vector { func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp clientmodel.Timestamp) Vector {
@ -451,7 +463,7 @@ func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint
default: default:
// For other aggregations, we already have the right value. // For other aggregations, we already have the right value.
} }
sample := &clientmodel.Sample{ sample := &Sample{
Metric: aggregation.labels, Metric: aggregation.labels,
Value: aggregation.value, Value: aggregation.value,
Timestamp: timestamp, Timestamp: timestamp,
@ -467,7 +479,7 @@ func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector {
vector := node.vector.Eval(timestamp) vector := node.vector.Eval(timestamp)
result := map[uint64]*groupedAggregation{} result := map[uint64]*groupedAggregation{}
for _, sample := range vector { for _, sample := range vector {
groupingKey := node.labelsToGroupingKey(sample.Metric) groupingKey := node.labelsToGroupingKey(sample.Metric.Metric)
if groupedResult, ok := result[groupingKey]; ok { if groupedResult, ok := result[groupingKey]; ok {
if node.keepExtraLabels { if node.keepExtraLabels {
groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric) groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric)
@ -493,14 +505,18 @@ func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector {
panic("Unknown aggregation type") panic("Unknown aggregation type")
} }
} else { } else {
m := clientmodel.Metric{} var m clientmodel.COWMetric
if node.keepExtraLabels { if node.keepExtraLabels {
m = sample.Metric m = sample.Metric
delete(m, clientmodel.MetricNameLabel) m.Delete(clientmodel.MetricNameLabel)
} else { } else {
m = clientmodel.COWMetric{
Metric: clientmodel.Metric{},
Copied: true,
}
for _, l := range node.groupBy { for _, l := range node.groupBy {
if v, ok := sample.Metric[l]; ok { if v, ok := sample.Metric.Metric[l]; ok {
m[l] = v m.Set(l, v)
} }
} }
} }
@ -524,8 +540,8 @@ func (node *VectorSelector) Eval(timestamp clientmodel.Timestamp) Vector {
sampleCandidates := it.GetValueAtTime(timestamp) sampleCandidates := it.GetValueAtTime(timestamp)
samplePair := chooseClosestSample(sampleCandidates, timestamp) samplePair := chooseClosestSample(sampleCandidates, timestamp)
if samplePair != nil { if samplePair != nil {
samples = append(samples, &clientmodel.Sample{ samples = append(samples, &Sample{
Metric: node.metrics[fp], // TODO: need copy here because downstream can modify! Metric: node.metrics[fp],
Value: samplePair.Value, Value: samplePair.Value,
Timestamp: timestamp, Timestamp: timestamp,
}) })
@ -737,7 +753,7 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
if keep { if keep {
rhsSample.Value = value rhsSample.Value = value
if node.opType.shouldDropMetric() { if node.opType.shouldDropMetric() {
delete(rhsSample.Metric, clientmodel.MetricNameLabel) rhsSample.Metric.Delete(clientmodel.MetricNameLabel)
} }
result = append(result, rhsSample) result = append(result, rhsSample)
} }
@ -751,7 +767,7 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
if keep { if keep {
lhsSample.Value = value lhsSample.Value = value
if node.opType.shouldDropMetric() { if node.opType.shouldDropMetric() {
delete(lhsSample.Metric, clientmodel.MetricNameLabel) lhsSample.Metric.Delete(clientmodel.MetricNameLabel)
} }
result = append(result, lhsSample) result = append(result, lhsSample)
} }
@ -762,12 +778,12 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
rhs := node.rhs.(VectorNode).Eval(timestamp) rhs := node.rhs.(VectorNode).Eval(timestamp)
for _, lhsSample := range lhs { for _, lhsSample := range lhs {
for _, rhsSample := range rhs { for _, rhsSample := range rhs {
if labelsEqual(lhsSample.Metric, rhsSample.Metric) { if labelsEqual(lhsSample.Metric.Metric, rhsSample.Metric.Metric) {
value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhsSample.Value) value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhsSample.Value)
if keep { if keep {
lhsSample.Value = value lhsSample.Value = value
if node.opType.shouldDropMetric() { if node.opType.shouldDropMetric() {
delete(lhsSample.Metric, clientmodel.MetricNameLabel) lhsSample.Metric.Delete(clientmodel.MetricNameLabel)
} }
result = append(result, lhsSample) result = append(result, lhsSample)
} }
@ -788,21 +804,21 @@ func (node *MatrixSelector) Eval(timestamp clientmodel.Timestamp) Matrix {
} }
//// timer := v.stats.GetTimer(stats.GetRangeValuesTime).Start() //// timer := v.stats.GetTimer(stats.GetRangeValuesTime).Start()
sampleSets := []metric.SampleSet{} sampleStreams := []SampleStream{}
for fp, it := range node.iterators { for fp, it := range node.iterators {
samplePairs := it.GetRangeValues(*interval) samplePairs := it.GetRangeValues(*interval)
if len(samplePairs) == 0 { if len(samplePairs) == 0 {
continue continue
} }
sampleSet := metric.SampleSet{ sampleStream := SampleStream{
Metric: node.metrics[fp], // TODO: need copy here because downstream can modify! Metric: node.metrics[fp],
Values: samplePairs, Values: samplePairs,
} }
sampleSets = append(sampleSets, sampleSet) sampleStreams = append(sampleStreams, sampleStream)
} }
//// timer.Stop() //// timer.Stop()
return sampleSets return sampleStreams
} }
// EvalBoundaries implements the MatrixNode interface and returns the // EvalBoundaries implements the MatrixNode interface and returns the
@ -814,21 +830,21 @@ func (node *MatrixSelector) EvalBoundaries(timestamp clientmodel.Timestamp) Matr
} }
//// timer := v.stats.GetTimer(stats.GetBoundaryValuesTime).Start() //// timer := v.stats.GetTimer(stats.GetBoundaryValuesTime).Start()
sampleSets := []metric.SampleSet{} sampleStreams := []SampleStream{}
for fp, it := range node.iterators { for fp, it := range node.iterators {
samplePairs := it.GetBoundaryValues(*interval) samplePairs := it.GetBoundaryValues(*interval)
if len(samplePairs) == 0 { if len(samplePairs) == 0 {
continue continue
} }
sampleSet := metric.SampleSet{ sampleStream := SampleStream{
Metric: node.metrics[fp], // TODO: make copy of metric. Metric: node.metrics[fp],
Values: samplePairs, Values: samplePairs,
} }
sampleSets = append(sampleSets, sampleSet) sampleStreams = append(sampleStreams, sampleStream)
} }
//// timer.Stop() //// timer.Stop()
return sampleSets return sampleStreams
} }
// Len implements sort.Interface. // Len implements sort.Interface.
@ -874,7 +890,7 @@ func NewVectorSelector(m metric.LabelMatchers) *VectorSelector {
return &VectorSelector{ return &VectorSelector{
labelMatchers: m, labelMatchers: m,
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{}, iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.Metric{}, metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{},
} }
} }
@ -967,7 +983,7 @@ func NewMatrixSelector(vector *VectorSelector, interval time.Duration) *MatrixSe
labelMatchers: vector.labelMatchers, labelMatchers: vector.labelMatchers,
interval: interval, interval: interval,
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{}, iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.Metric{}, metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{},
} }
} }

View file

@ -128,12 +128,12 @@ func deltaImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
intervalCorrection := clientmodel.SampleValue(targetInterval) / clientmodel.SampleValue(sampledInterval) intervalCorrection := clientmodel.SampleValue(targetInterval) / clientmodel.SampleValue(sampledInterval)
resultValue *= intervalCorrection resultValue *= intervalCorrection
resultSample := &clientmodel.Sample{ resultSample := &Sample{
Metric: samples.Metric, Metric: samples.Metric,
Value: resultValue, Value: resultValue,
Timestamp: timestamp, Timestamp: timestamp,
} }
delete(resultSample.Metric, clientmodel.MetricNameLabel) resultSample.Metric.Delete(clientmodel.MetricNameLabel)
resultVector = append(resultVector, resultSample) resultVector = append(resultVector, resultSample)
} }
return resultVector return resultVector
@ -169,7 +169,7 @@ func (s vectorByValueHeap) Swap(i, j int) {
} }
func (s *vectorByValueHeap) Push(x interface{}) { func (s *vectorByValueHeap) Push(x interface{}) {
*s = append(*s, x.(*clientmodel.Sample)) *s = append(*s, x.(*Sample))
} }
func (s *vectorByValueHeap) Pop() interface{} { func (s *vectorByValueHeap) Pop() interface{} {
@ -254,7 +254,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac
return Vector{} return Vector{}
} }
common := clientmodel.LabelSet{} common := clientmodel.LabelSet{}
for k, v := range vector[0].Metric { for k, v := range vector[0].Metric.Metric {
// TODO(julius): Should we also drop common metric names? // TODO(julius): Should we also drop common metric names?
if k == clientmodel.MetricNameLabel { if k == clientmodel.MetricNameLabel {
continue continue
@ -264,7 +264,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac
for _, el := range vector[1:] { for _, el := range vector[1:] {
for k, v := range common { for k, v := range common {
if el.Metric[k] != v { if el.Metric.Metric[k] != v {
// Deletion of map entries while iterating over them is safe. // Deletion of map entries while iterating over them is safe.
// From http://golang.org/ref/spec#For_statements: // From http://golang.org/ref/spec#For_statements:
// "If map entries that have not yet been reached are deleted during // "If map entries that have not yet been reached are deleted during
@ -275,88 +275,15 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac
} }
for _, el := range vector { for _, el := range vector {
for k := range el.Metric { for k := range el.Metric.Metric {
if _, ok := common[k]; ok { if _, ok := common[k]; ok {
delete(el.Metric, k) el.Metric.Delete(k)
} }
} }
} }
return vector return vector
} }
// === sampleVectorImpl() Vector ===
func sampleVectorImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
return Vector{
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "0",
},
Value: 10,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "1",
},
Value: 20,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "2",
},
Value: 30,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "3",
"group": "canary",
},
Value: 40,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "2",
"group": "canary",
},
Value: 40,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "3",
"group": "mytest",
},
Value: 40,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "http_requests",
clientmodel.JobLabel: "api-server",
"instance": "3",
"group": "mytest",
},
Value: 40,
Timestamp: timestamp,
},
}
}
// === scalar(node VectorNode) Scalar === // === scalar(node VectorNode) Scalar ===
func scalarImpl(timestamp clientmodel.Timestamp, args []Node) interface{} { func scalarImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
v := args[0].(VectorNode).Eval(timestamp) v := args[0].(VectorNode).Eval(timestamp)
@ -381,8 +308,8 @@ func aggrOverTime(timestamp clientmodel.Timestamp, args []Node, aggrFn func(metr
continue continue
} }
delete(el.Metric, clientmodel.MetricNameLabel) el.Metric.Delete(clientmodel.MetricNameLabel)
resultVector = append(resultVector, &clientmodel.Sample{ resultVector = append(resultVector, &Sample{
Metric: el.Metric, Metric: el.Metric,
Value: aggrFn(el.Values), Value: aggrFn(el.Values),
Timestamp: timestamp, Timestamp: timestamp,
@ -447,7 +374,7 @@ func absImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
n := args[0].(VectorNode) n := args[0].(VectorNode)
vector := n.Eval(timestamp) vector := n.Eval(timestamp)
for _, el := range vector { for _, el := range vector {
delete(el.Metric, clientmodel.MetricNameLabel) el.Metric.Delete(clientmodel.MetricNameLabel)
el.Value = clientmodel.SampleValue(math.Abs(float64(el.Value))) el.Value = clientmodel.SampleValue(math.Abs(float64(el.Value)))
} }
return vector return vector
@ -468,8 +395,11 @@ func absentImpl(timestamp clientmodel.Timestamp, args []Node) interface{} {
} }
} }
return Vector{ return Vector{
&clientmodel.Sample{ &Sample{
Metric: m, Metric: clientmodel.COWMetric{
Metric: m,
Copied: true,
},
Value: 1, Value: 1,
Timestamp: timestamp, Timestamp: timestamp,
}, },
@ -543,12 +473,6 @@ var functions = map[string]*Function{
returnType: VECTOR, returnType: VECTOR,
callFn: rateImpl, callFn: rateImpl,
}, },
"sampleVector": {
name: "sampleVector",
argTypes: []ExprType{},
returnType: VECTOR,
callFn: sampleVectorImpl,
},
"scalar": { "scalar": {
name: "scalar", name: "scalar",
argTypes: []ExprType{VECTOR}, argTypes: []ExprType{VECTOR},

View file

@ -30,8 +30,10 @@ func (node emptyRangeNode) Children() Nodes { return Nodes{} }
func (node emptyRangeNode) Eval(timestamp clientmodel.Timestamp) Matrix { func (node emptyRangeNode) Eval(timestamp clientmodel.Timestamp) Matrix {
return Matrix{ return Matrix{
metric.SampleSet{ SampleStream{
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"}, Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"},
},
Values: metric.Values{}, Values: metric.Values{},
}, },
} }
@ -39,8 +41,10 @@ func (node emptyRangeNode) Eval(timestamp clientmodel.Timestamp) Matrix {
func (node emptyRangeNode) EvalBoundaries(timestamp clientmodel.Timestamp) Matrix { func (node emptyRangeNode) EvalBoundaries(timestamp clientmodel.Timestamp) Matrix {
return Matrix{ return Matrix{
metric.SampleSet{ SampleStream{
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"}, Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"},
},
Values: metric.Values{}, Values: metric.Values{},
}, },
} }

View file

@ -90,20 +90,21 @@ func (vector Vector) String() string {
func (matrix Matrix) String() string { func (matrix Matrix) String() string {
metricStrings := make([]string, 0, len(matrix)) metricStrings := make([]string, 0, len(matrix))
for _, sampleSet := range matrix { for _, sampleStream := range matrix {
metricName, ok := sampleSet.Metric[clientmodel.MetricNameLabel] metricName, hasName := sampleStream.Metric.Metric[clientmodel.MetricNameLabel]
if !ok { numLabels := len(sampleStream.Metric.Metric)
panic("Tried to print matrix without metric name") if hasName {
numLabels--
} }
labelStrings := make([]string, 0, len(sampleSet.Metric)-1) labelStrings := make([]string, 0, numLabels)
for label, value := range sampleSet.Metric { for label, value := range sampleStream.Metric.Metric {
if label != clientmodel.MetricNameLabel { if label != clientmodel.MetricNameLabel {
labelStrings = append(labelStrings, fmt.Sprintf("%s=%q", label, value)) labelStrings = append(labelStrings, fmt.Sprintf("%s=%q", label, value))
} }
} }
sort.Strings(labelStrings) sort.Strings(labelStrings)
valueStrings := make([]string, 0, len(sampleSet.Values)) valueStrings := make([]string, 0, len(sampleStream.Values))
for _, value := range sampleSet.Values { for _, value := range sampleStream.Values {
valueStrings = append(valueStrings, valueStrings = append(valueStrings,
fmt.Sprintf("\n%v @[%v]", value.Value, value.Timestamp)) fmt.Sprintf("\n%v @[%v]", value.Value, value.Timestamp))
} }
@ -218,7 +219,7 @@ func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Stor
case SCALAR: case SCALAR:
scalar := node.(ScalarNode).Eval(timestamp) scalar := node.(ScalarNode).Eval(timestamp)
evalTimer.Stop() evalTimer.Stop()
return Vector{&clientmodel.Sample{Value: scalar}}, nil return Vector{&Sample{Value: scalar}}, nil
case VECTOR: case VECTOR:
vector := node.(VectorNode).Eval(timestamp) vector := node.(VectorNode).Eval(timestamp)
evalTimer.Stop() evalTimer.Stop()
@ -228,8 +229,16 @@ func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Stor
case STRING: case STRING:
str := node.(StringNode).Eval(timestamp) str := node.(StringNode).Eval(timestamp)
evalTimer.Stop() evalTimer.Stop()
return Vector{&clientmodel.Sample{ return Vector{
Metric: clientmodel.Metric{"__value__": clientmodel.LabelValue(str)}}}, nil &Sample{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
"__value__": clientmodel.LabelValue(str),
},
Copied: true,
},
},
}, nil
} }
panic("Switch didn't cover all node types") panic("Switch didn't cover all node types")
} }

View file

@ -41,10 +41,10 @@ func getTestValueStream(startVal clientmodel.SampleValue, endVal clientmodel.Sam
func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
vector := ast.Vector{} vector := ast.Vector{}
for _, sampleSet := range matrix { for _, sampleStream := range matrix {
lastSample := sampleSet.Values[len(sampleSet.Values)-1] lastSample := sampleStream.Values[len(sampleStream.Values)-1]
vector = append(vector, &clientmodel.Sample{ vector = append(vector, &ast.Sample{
Metric: sampleSet.Metric, Metric: sampleStream.Metric,
Value: lastSample.Value, Value: lastSample.Value,
Timestamp: lastSample.Timestamp, Timestamp: lastSample.Timestamp,
}) })
@ -54,10 +54,10 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
func storeMatrix(storage local.Storage, matrix ast.Matrix) { func storeMatrix(storage local.Storage, matrix ast.Matrix) {
pendingSamples := clientmodel.Samples{} pendingSamples := clientmodel.Samples{}
for _, sampleSet := range matrix { for _, sampleStream := range matrix {
for _, sample := range sampleSet.Values { for _, sample := range sampleStream.Values {
pendingSamples = append(pendingSamples, &clientmodel.Sample{ pendingSamples = append(pendingSamples, &clientmodel.Sample{
Metric: sampleSet.Metric, Metric: sampleStream.Metric.Metric,
Value: sample.Value, Value: sample.Value,
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
}) })
@ -69,96 +69,118 @@ func storeMatrix(storage local.Storage, matrix ast.Matrix) {
var testMatrix = ast.Matrix{ var testMatrix = ast.Matrix{
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "api-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "0", clientmodel.JobLabel: "api-server",
"group": "production", "instance": "0",
"group": "production",
},
}, },
Values: getTestValueStream(0, 100, 10, testStartTime), Values: getTestValueStream(0, 100, 10, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "api-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "1", clientmodel.JobLabel: "api-server",
"group": "production", "instance": "1",
"group": "production",
},
}, },
Values: getTestValueStream(0, 200, 20, testStartTime), Values: getTestValueStream(0, 200, 20, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "api-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "0", clientmodel.JobLabel: "api-server",
"group": "canary", "instance": "0",
"group": "canary",
},
}, },
Values: getTestValueStream(0, 300, 30, testStartTime), Values: getTestValueStream(0, 300, 30, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "api-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "1", clientmodel.JobLabel: "api-server",
"group": "canary", "instance": "1",
"group": "canary",
},
}, },
Values: getTestValueStream(0, 400, 40, testStartTime), Values: getTestValueStream(0, 400, 40, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "app-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "0", clientmodel.JobLabel: "app-server",
"group": "production", "instance": "0",
"group": "production",
},
}, },
Values: getTestValueStream(0, 500, 50, testStartTime), Values: getTestValueStream(0, 500, 50, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "app-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "1", clientmodel.JobLabel: "app-server",
"group": "production", "instance": "1",
"group": "production",
},
}, },
Values: getTestValueStream(0, 600, 60, testStartTime), Values: getTestValueStream(0, 600, 60, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "app-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "0", clientmodel.JobLabel: "app-server",
"group": "canary", "instance": "0",
"group": "canary",
},
}, },
Values: getTestValueStream(0, 700, 70, testStartTime), Values: getTestValueStream(0, 700, 70, testStartTime),
}, },
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "http_requests", Metric: clientmodel.Metric{
clientmodel.JobLabel: "app-server", clientmodel.MetricNameLabel: "http_requests",
"instance": "1", clientmodel.JobLabel: "app-server",
"group": "canary", "instance": "1",
"group": "canary",
},
}, },
Values: getTestValueStream(0, 800, 80, testStartTime), Values: getTestValueStream(0, 800, 80, testStartTime),
}, },
// Single-letter metric and label names. // Single-letter metric and label names.
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "x", Metric: clientmodel.Metric{
"y": "testvalue", clientmodel.MetricNameLabel: "x",
"y": "testvalue",
},
}, },
Values: getTestValueStream(0, 100, 10, testStartTime), Values: getTestValueStream(0, 100, 10, testStartTime),
}, },
// Counter reset in the middle of range. // Counter reset in the middle of range.
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "testcounter_reset_middle", Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testcounter_reset_middle",
},
}, },
Values: append(getTestValueStream(0, 40, 10, testStartTime), getTestValueStream(0, 50, 10, testStartTime.Add(testSampleInterval*5))...), Values: append(getTestValueStream(0, 40, 10, testStartTime), getTestValueStream(0, 50, 10, testStartTime.Add(testSampleInterval*5))...),
}, },
// Counter reset at the end of range. // Counter reset at the end of range.
{ {
Metric: clientmodel.Metric{ Metric: clientmodel.COWMetric{
clientmodel.MetricNameLabel: "testcounter_reset_end", Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testcounter_reset_end",
},
}, },
Values: append(getTestValueStream(0, 90, 10, testStartTime), getTestValueStream(0, 0, 10, testStartTime.Add(testSampleInterval*10))...), Values: append(getTestValueStream(0, 90, 10, testStartTime), getTestValueStream(0, 0, 10, testStartTime.Add(testSampleInterval*10))...),
}, },

View file

@ -222,7 +222,13 @@ func (m *ruleManager) runIteration(results chan<- *extraction.Result) {
duration := time.Since(start) duration := time.Since(start)
samples := make(clientmodel.Samples, len(vector)) samples := make(clientmodel.Samples, len(vector))
copy(samples, vector) for i, s := range vector {
samples[i] = &clientmodel.Sample{
Metric: s.Metric.Metric,
Value: s.Value,
Timestamp: s.Timestamp,
}
}
m.results <- &extraction.Result{ m.results <- &extraction.Result{
Samples: samples, Samples: samples,
Err: err, Err: err,

View file

@ -50,12 +50,12 @@ func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage local.St
// Override the metric name and labels. // Override the metric name and labels.
for _, sample := range vector { for _, sample := range vector {
sample.Metric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(rule.name) sample.Metric.Set(clientmodel.MetricNameLabel, clientmodel.LabelValue(rule.name))
for label, value := range rule.labels { for label, value := range rule.labels {
if value == "" { if value == "" {
delete(sample.Metric, label) sample.Metric.Delete(label)
} else { } else {
sample.Metric[label] = value sample.Metric.Set(label, value)
} }
} }
} }

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
) )
@ -60,7 +61,7 @@ func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) {
func TestExpressions(t *testing.T) { func TestExpressions(t *testing.T) {
// Labels in expected output need to be alphabetically sorted. // Labels in expected output need to be alphabetically sorted.
var expressionTests = []struct { expressionTests := []struct {
expr string expr string
output []string output []string
shouldFail bool shouldFail bool
@ -705,6 +706,188 @@ func TestExpressions(t *testing.T) {
} }
} }
func TestRangedEvaluationRegressions(t *testing.T) {
scenarios := []struct {
in ast.Matrix
out ast.Matrix
expr string
}{
{
// Testing COWMetric behavior in drop_common_labels.
in: ast.Matrix{
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"testlabel": "1",
},
},
Values: metric.Values{
{
Timestamp: testStartTime,
Value: 1,
},
{
Timestamp: testStartTime.Add(time.Hour),
Value: 1,
},
},
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"testlabel": "2",
},
},
Values: metric.Values{
{
Timestamp: testStartTime.Add(time.Hour),
Value: 2,
},
},
},
},
out: ast.Matrix{
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
},
},
Values: metric.Values{
{
Timestamp: testStartTime,
Value: 1,
},
},
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"testlabel": "1",
},
},
Values: metric.Values{
{
Timestamp: testStartTime.Add(time.Hour),
Value: 1,
},
},
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"testlabel": "2",
},
},
Values: metric.Values{
{
Timestamp: testStartTime.Add(time.Hour),
Value: 2,
},
},
},
},
expr: "drop_common_labels(testmetric)",
},
{
// Testing COWMetric behavior in vector aggregation.
in: ast.Matrix{
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"testlabel": "1",
},
},
Values: metric.Values{
{
Timestamp: testStartTime,
Value: 1,
},
{
Timestamp: testStartTime.Add(time.Hour),
Value: 1,
},
},
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"testlabel": "2",
},
},
Values: metric.Values{
{
Timestamp: testStartTime,
Value: 2,
},
},
},
},
out: ast.Matrix{
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{},
},
Values: metric.Values{
{
Timestamp: testStartTime,
Value: 3,
},
},
},
{
Metric: clientmodel.COWMetric{
Metric: clientmodel.Metric{
"testlabel": "1",
},
},
Values: metric.Values{
{
Timestamp: testStartTime.Add(time.Hour),
Value: 1,
},
},
},
},
expr: "sum(testmetric) keeping_extra",
},
}
for i, s := range scenarios {
storage, closer := local.NewTestStorage(t)
storeMatrix(storage, s.in)
expr, err := LoadExprFromString(s.expr)
if err != nil {
t.Fatalf("%d. Error parsing expression: %v", i, err)
}
got, err := ast.EvalVectorRange(
expr.(ast.VectorNode),
testStartTime,
testStartTime.Add(time.Hour),
time.Hour,
storage,
stats.NewTimerGroup(),
)
if err != nil {
t.Fatalf("%d. Error evaluating expression: %v", i, err)
}
if got.String() != s.out.String() {
t.Fatalf("%d. Expression: %s\n\ngot:\n=====\n%v\n====\n\nwant:\n=====\n%v\n=====\n", i, s.expr, got.String(), s.out.String())
}
closer.Close()
}
}
var ruleTests = []struct { var ruleTests = []struct {
inputFile string inputFile string
shouldFail bool shouldFail bool

View file

@ -14,9 +14,9 @@
package local package local
import ( import (
"time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"time"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
@ -38,7 +38,7 @@ type Storage interface {
// Get all of the label values that are associated with a given label name. // Get all of the label values that are associated with a given label name.
GetLabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues GetLabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.
GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.Metric GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric
// Construct an iterator for a given fingerprint. // Construct an iterator for a given fingerprint.
NewIterator(clientmodel.Fingerprint) SeriesIterator NewIterator(clientmodel.Fingerprint) SeriesIterator
// Run the various maintenance loops in goroutines. Returns when the // Run the various maintenance loops in goroutines. Returns when the

View file

@ -323,25 +323,25 @@ func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.L
} }
// GetMetricForFingerprint implements Storage. // GetMetricForFingerprint implements Storage.
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric { func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if ok { if ok {
// Copy required here because caller might mutate the returned // Wrap the returned metric in a copy-on-write (COW) metric here because
// metric. // the caller might mutate it.
m := make(clientmodel.Metric, len(series.metric)) return clientmodel.COWMetric{
for ln, lv := range series.metric { Metric: series.metric,
m[ln] = lv
} }
return m
} }
metric, err := s.persistence.getArchivedMetric(fp) metric, err := s.persistence.getArchivedMetric(fp)
if err != nil { if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
} }
return metric return clientmodel.COWMetric{
Metric: metric,
}
} }
// AppendSamples implements Storage. // AppendSamples implements Storage.

View file

@ -47,12 +47,6 @@ func (s *SamplePair) String() string {
// Values is a slice of SamplePairs. // Values is a slice of SamplePairs.
type Values []SamplePair type Values []SamplePair
// SampleSet is Values with a Metric attached.
type SampleSet struct {
Metric clientmodel.Metric
Values Values
}
// Interval describes the inclusive interval between two Timestamps. // Interval describes the inclusive interval between two Timestamps.
type Interval struct { type Interval struct {
OldestInclusive clientmodel.Timestamp OldestInclusive clientmodel.Timestamp

View file

@ -76,7 +76,7 @@ func query(q string, timestamp clientmodel.Timestamp, storage local.Storage) (qu
Value: float64(v.Value), Value: float64(v.Value),
Labels: make(map[string]string), Labels: make(map[string]string),
} }
for label, value := range v.Metric { for label, value := range v.Metric.Metric {
s.Labels[string(label)] = string(value) s.Labels[string(label)] = string(value)
} }
result[n] = &s result[n] = &s