diff --git a/rules/alerting.go b/rules/alerting.go index f1961e42a..a779d6959 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -80,7 +80,7 @@ type Alert struct { } // sample returns a Sample suitable for recording the alert. -func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *clientmodel.Sample { +func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *ast.Sample { recordedMetric := clientmodel.Metric{} for label, value := range a.Labels { recordedMetric[label] = value @@ -90,8 +90,11 @@ func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleV recordedMetric[AlertNameLabel] = clientmodel.LabelValue(a.Name) recordedMetric[AlertStateLabel] = clientmodel.LabelValue(a.State.String()) - return &clientmodel.Sample{ - Metric: recordedMetric, + return &ast.Sample{ + Metric: clientmodel.COWMetric{ + Metric: recordedMetric, + Copied: true, + }, Value: value, Timestamp: timestamp, } @@ -145,18 +148,17 @@ func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage local.St // or update the expression value for existing elements. resultFingerprints := utility.Set{} for _, sample := range exprResult { - fp := new(clientmodel.Fingerprint) - fp.LoadFromMetric(sample.Metric) - resultFingerprints.Add(*fp) + fp := sample.Metric.Metric.Fingerprint() + resultFingerprints.Add(fp) - if alert, ok := rule.activeAlerts[*fp]; !ok { + if alert, ok := rule.activeAlerts[fp]; !ok { labels := clientmodel.LabelSet{} - labels.MergeFromMetric(sample.Metric) + labels.MergeFromMetric(sample.Metric.Metric) labels = labels.Merge(rule.Labels) if _, ok := labels[clientmodel.MetricNameLabel]; ok { delete(labels, clientmodel.MetricNameLabel) } - rule.activeAlerts[*fp] = &Alert{ + rule.activeAlerts[fp] = &Alert{ Name: rule.name, Labels: labels, State: Pending, diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 79cc768f2..b36385ab9 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -34,17 +34,30 @@ var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "St // ---------------------------------------------------------------------------- // Raw data value types. +// SampleStream is a stream of Values belonging to an attached COWMetric. +type SampleStream struct { + Metric clientmodel.COWMetric + Values metric.Values +} + +// Sample is a single sample belonging to a COWMetric. +type Sample struct { + Metric clientmodel.COWMetric + Value clientmodel.SampleValue + Timestamp clientmodel.Timestamp +} + // Vector is basically only an alias for clientmodel.Samples, but the // contract is that in a Vector, all Samples have the same timestamp. -type Vector clientmodel.Samples +type Vector []*Sample -// Matrix is a slice of SampleSets that implements sort.Interface and +// Matrix is a slice of SampleStreams that implements sort.Interface and // has a String method. // BUG(julius): Pointerize this. -type Matrix []metric.SampleSet +type Matrix []SampleStream type groupedAggregation struct { - labels clientmodel.Metric + labels clientmodel.COWMetric value clientmodel.SampleValue groupCount int } @@ -191,7 +204,7 @@ type ( labelMatchers metric.LabelMatchers // The series iterators are populated at query analysis time. iterators map[clientmodel.Fingerprint]local.SeriesIterator - metrics map[clientmodel.Fingerprint]clientmodel.Metric + metrics map[clientmodel.Fingerprint]clientmodel.COWMetric // Fingerprints are populated from label matchers at query analysis time. fingerprints clientmodel.Fingerprints } @@ -231,7 +244,7 @@ type ( labelMatchers metric.LabelMatchers // The series iterators are populated at query analysis time. iterators map[clientmodel.Fingerprint]local.SeriesIterator - metrics map[clientmodel.Fingerprint]clientmodel.Metric + metrics map[clientmodel.Fingerprint]clientmodel.COWMetric // Fingerprints are populated from label matchers at query analysis time. fingerprints clientmodel.Fingerprints interval time.Duration @@ -400,44 +413,43 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod // TODO implement watchdog timer for long-running queries. evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start() - sampleSets := map[uint64]*metric.SampleSet{} - for t := start; t.Before(end); t = t.Add(interval) { + sampleStreams := map[uint64]*SampleStream{} + for t := start; !t.After(end); t = t.Add(interval) { vector := node.Eval(t) for _, sample := range vector { samplePair := metric.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, } - groupingKey := labelsToKey(sample.Metric) - if sampleSets[groupingKey] == nil { - sampleSets[groupingKey] = &metric.SampleSet{ + groupingKey := labelsToKey(sample.Metric.Metric) + if sampleStreams[groupingKey] == nil { + sampleStreams[groupingKey] = &SampleStream{ Metric: sample.Metric, Values: metric.Values{samplePair}, } } else { - sampleSets[groupingKey].Values = append(sampleSets[groupingKey].Values, samplePair) + sampleStreams[groupingKey].Values = append(sampleStreams[groupingKey].Values, samplePair) } } } evalTimer.Stop() appendTimer := queryStats.GetTimer(stats.ResultAppendTime).Start() - for _, sampleSet := range sampleSets { - matrix = append(matrix, *sampleSet) + for _, sampleStream := range sampleStreams { + matrix = append(matrix, *sampleStream) } appendTimer.Stop() return matrix, nil } -func labelIntersection(metric1, metric2 clientmodel.Metric) clientmodel.Metric { - intersection := clientmodel.Metric{} - for label, value := range metric1 { - if metric2[label] == value { - intersection[label] = value +func labelIntersection(metric1, metric2 clientmodel.COWMetric) clientmodel.COWMetric { + for label, value := range metric1.Metric { + if metric2.Metric[label] != value { + metric1.Delete(label) } } - return intersection + return metric1 } func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp clientmodel.Timestamp) Vector { @@ -451,7 +463,7 @@ func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint default: // For other aggregations, we already have the right value. } - sample := &clientmodel.Sample{ + sample := &Sample{ Metric: aggregation.labels, Value: aggregation.value, Timestamp: timestamp, @@ -467,7 +479,7 @@ func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector { vector := node.vector.Eval(timestamp) result := map[uint64]*groupedAggregation{} for _, sample := range vector { - groupingKey := node.labelsToGroupingKey(sample.Metric) + groupingKey := node.labelsToGroupingKey(sample.Metric.Metric) if groupedResult, ok := result[groupingKey]; ok { if node.keepExtraLabels { groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric) @@ -493,14 +505,18 @@ func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector { panic("Unknown aggregation type") } } else { - m := clientmodel.Metric{} + var m clientmodel.COWMetric if node.keepExtraLabels { m = sample.Metric - delete(m, clientmodel.MetricNameLabel) + m.Delete(clientmodel.MetricNameLabel) } else { + m = clientmodel.COWMetric{ + Metric: clientmodel.Metric{}, + Copied: true, + } for _, l := range node.groupBy { - if v, ok := sample.Metric[l]; ok { - m[l] = v + if v, ok := sample.Metric.Metric[l]; ok { + m.Set(l, v) } } } @@ -524,8 +540,8 @@ func (node *VectorSelector) Eval(timestamp clientmodel.Timestamp) Vector { sampleCandidates := it.GetValueAtTime(timestamp) samplePair := chooseClosestSample(sampleCandidates, timestamp) if samplePair != nil { - samples = append(samples, &clientmodel.Sample{ - Metric: node.metrics[fp], // TODO: need copy here because downstream can modify! + samples = append(samples, &Sample{ + Metric: node.metrics[fp], Value: samplePair.Value, Timestamp: timestamp, }) @@ -737,7 +753,7 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector { if keep { rhsSample.Value = value if node.opType.shouldDropMetric() { - delete(rhsSample.Metric, clientmodel.MetricNameLabel) + rhsSample.Metric.Delete(clientmodel.MetricNameLabel) } result = append(result, rhsSample) } @@ -751,7 +767,7 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector { if keep { lhsSample.Value = value if node.opType.shouldDropMetric() { - delete(lhsSample.Metric, clientmodel.MetricNameLabel) + lhsSample.Metric.Delete(clientmodel.MetricNameLabel) } result = append(result, lhsSample) } @@ -762,12 +778,12 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector { rhs := node.rhs.(VectorNode).Eval(timestamp) for _, lhsSample := range lhs { for _, rhsSample := range rhs { - if labelsEqual(lhsSample.Metric, rhsSample.Metric) { + if labelsEqual(lhsSample.Metric.Metric, rhsSample.Metric.Metric) { value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhsSample.Value) if keep { lhsSample.Value = value if node.opType.shouldDropMetric() { - delete(lhsSample.Metric, clientmodel.MetricNameLabel) + lhsSample.Metric.Delete(clientmodel.MetricNameLabel) } result = append(result, lhsSample) } @@ -788,21 +804,21 @@ func (node *MatrixSelector) Eval(timestamp clientmodel.Timestamp) Matrix { } //// timer := v.stats.GetTimer(stats.GetRangeValuesTime).Start() - sampleSets := []metric.SampleSet{} + sampleStreams := []SampleStream{} for fp, it := range node.iterators { samplePairs := it.GetRangeValues(*interval) if len(samplePairs) == 0 { continue } - sampleSet := metric.SampleSet{ - Metric: node.metrics[fp], // TODO: need copy here because downstream can modify! + sampleStream := SampleStream{ + Metric: node.metrics[fp], Values: samplePairs, } - sampleSets = append(sampleSets, sampleSet) + sampleStreams = append(sampleStreams, sampleStream) } //// timer.Stop() - return sampleSets + return sampleStreams } // EvalBoundaries implements the MatrixNode interface and returns the @@ -814,21 +830,21 @@ func (node *MatrixSelector) EvalBoundaries(timestamp clientmodel.Timestamp) Matr } //// timer := v.stats.GetTimer(stats.GetBoundaryValuesTime).Start() - sampleSets := []metric.SampleSet{} + sampleStreams := []SampleStream{} for fp, it := range node.iterators { samplePairs := it.GetBoundaryValues(*interval) if len(samplePairs) == 0 { continue } - sampleSet := metric.SampleSet{ - Metric: node.metrics[fp], // TODO: make copy of metric. + sampleStream := SampleStream{ + Metric: node.metrics[fp], Values: samplePairs, } - sampleSets = append(sampleSets, sampleSet) + sampleStreams = append(sampleStreams, sampleStream) } //// timer.Stop() - return sampleSets + return sampleStreams } // Len implements sort.Interface. @@ -874,7 +890,7 @@ func NewVectorSelector(m metric.LabelMatchers) *VectorSelector { return &VectorSelector{ labelMatchers: m, iterators: map[clientmodel.Fingerprint]local.SeriesIterator{}, - metrics: map[clientmodel.Fingerprint]clientmodel.Metric{}, + metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{}, } } @@ -967,7 +983,7 @@ func NewMatrixSelector(vector *VectorSelector, interval time.Duration) *MatrixSe labelMatchers: vector.labelMatchers, interval: interval, iterators: map[clientmodel.Fingerprint]local.SeriesIterator{}, - metrics: map[clientmodel.Fingerprint]clientmodel.Metric{}, + metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{}, } } diff --git a/rules/ast/functions.go b/rules/ast/functions.go index 328d6b83d..3604cc87d 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -128,12 +128,12 @@ func deltaImpl(timestamp clientmodel.Timestamp, args []Node) interface{} { intervalCorrection := clientmodel.SampleValue(targetInterval) / clientmodel.SampleValue(sampledInterval) resultValue *= intervalCorrection - resultSample := &clientmodel.Sample{ + resultSample := &Sample{ Metric: samples.Metric, Value: resultValue, Timestamp: timestamp, } - delete(resultSample.Metric, clientmodel.MetricNameLabel) + resultSample.Metric.Delete(clientmodel.MetricNameLabel) resultVector = append(resultVector, resultSample) } return resultVector @@ -169,7 +169,7 @@ func (s vectorByValueHeap) Swap(i, j int) { } func (s *vectorByValueHeap) Push(x interface{}) { - *s = append(*s, x.(*clientmodel.Sample)) + *s = append(*s, x.(*Sample)) } func (s *vectorByValueHeap) Pop() interface{} { @@ -254,7 +254,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac return Vector{} } common := clientmodel.LabelSet{} - for k, v := range vector[0].Metric { + for k, v := range vector[0].Metric.Metric { // TODO(julius): Should we also drop common metric names? if k == clientmodel.MetricNameLabel { continue @@ -264,7 +264,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac for _, el := range vector[1:] { for k, v := range common { - if el.Metric[k] != v { + if el.Metric.Metric[k] != v { // Deletion of map entries while iterating over them is safe. // From http://golang.org/ref/spec#For_statements: // "If map entries that have not yet been reached are deleted during @@ -275,88 +275,15 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac } for _, el := range vector { - for k := range el.Metric { + for k := range el.Metric.Metric { if _, ok := common[k]; ok { - delete(el.Metric, k) + el.Metric.Delete(k) } } } return vector } -// === sampleVectorImpl() Vector === -func sampleVectorImpl(timestamp clientmodel.Timestamp, args []Node) interface{} { - return Vector{ - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "0", - }, - Value: 10, - Timestamp: timestamp, - }, - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "1", - }, - Value: 20, - Timestamp: timestamp, - }, - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "2", - }, - Value: 30, - Timestamp: timestamp, - }, - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "3", - "group": "canary", - }, - Value: 40, - Timestamp: timestamp, - }, - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "2", - "group": "canary", - }, - Value: 40, - Timestamp: timestamp, - }, - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "3", - "group": "mytest", - }, - Value: 40, - Timestamp: timestamp, - }, - &clientmodel.Sample{ - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "3", - "group": "mytest", - }, - Value: 40, - Timestamp: timestamp, - }, - } -} - // === scalar(node VectorNode) Scalar === func scalarImpl(timestamp clientmodel.Timestamp, args []Node) interface{} { v := args[0].(VectorNode).Eval(timestamp) @@ -381,8 +308,8 @@ func aggrOverTime(timestamp clientmodel.Timestamp, args []Node, aggrFn func(metr continue } - delete(el.Metric, clientmodel.MetricNameLabel) - resultVector = append(resultVector, &clientmodel.Sample{ + el.Metric.Delete(clientmodel.MetricNameLabel) + resultVector = append(resultVector, &Sample{ Metric: el.Metric, Value: aggrFn(el.Values), Timestamp: timestamp, @@ -447,7 +374,7 @@ func absImpl(timestamp clientmodel.Timestamp, args []Node) interface{} { n := args[0].(VectorNode) vector := n.Eval(timestamp) for _, el := range vector { - delete(el.Metric, clientmodel.MetricNameLabel) + el.Metric.Delete(clientmodel.MetricNameLabel) el.Value = clientmodel.SampleValue(math.Abs(float64(el.Value))) } return vector @@ -468,8 +395,11 @@ func absentImpl(timestamp clientmodel.Timestamp, args []Node) interface{} { } } return Vector{ - &clientmodel.Sample{ - Metric: m, + &Sample{ + Metric: clientmodel.COWMetric{ + Metric: m, + Copied: true, + }, Value: 1, Timestamp: timestamp, }, @@ -543,12 +473,6 @@ var functions = map[string]*Function{ returnType: VECTOR, callFn: rateImpl, }, - "sampleVector": { - name: "sampleVector", - argTypes: []ExprType{}, - returnType: VECTOR, - callFn: sampleVectorImpl, - }, "scalar": { name: "scalar", argTypes: []ExprType{VECTOR}, diff --git a/rules/ast/functions_test.go b/rules/ast/functions_test.go index fa4d1da71..86add4eae 100644 --- a/rules/ast/functions_test.go +++ b/rules/ast/functions_test.go @@ -30,8 +30,10 @@ func (node emptyRangeNode) Children() Nodes { return Nodes{} } func (node emptyRangeNode) Eval(timestamp clientmodel.Timestamp) Matrix { return Matrix{ - metric.SampleSet{ - Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"}, + SampleStream{ + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"}, + }, Values: metric.Values{}, }, } @@ -39,8 +41,10 @@ func (node emptyRangeNode) Eval(timestamp clientmodel.Timestamp) Matrix { func (node emptyRangeNode) EvalBoundaries(timestamp clientmodel.Timestamp) Matrix { return Matrix{ - metric.SampleSet{ - Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"}, + SampleStream{ + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"}, + }, Values: metric.Values{}, }, } diff --git a/rules/ast/printer.go b/rules/ast/printer.go index 04068e8a9..0155a5838 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -90,20 +90,21 @@ func (vector Vector) String() string { func (matrix Matrix) String() string { metricStrings := make([]string, 0, len(matrix)) - for _, sampleSet := range matrix { - metricName, ok := sampleSet.Metric[clientmodel.MetricNameLabel] - if !ok { - panic("Tried to print matrix without metric name") + for _, sampleStream := range matrix { + metricName, hasName := sampleStream.Metric.Metric[clientmodel.MetricNameLabel] + numLabels := len(sampleStream.Metric.Metric) + if hasName { + numLabels-- } - labelStrings := make([]string, 0, len(sampleSet.Metric)-1) - for label, value := range sampleSet.Metric { + labelStrings := make([]string, 0, numLabels) + for label, value := range sampleStream.Metric.Metric { if label != clientmodel.MetricNameLabel { labelStrings = append(labelStrings, fmt.Sprintf("%s=%q", label, value)) } } sort.Strings(labelStrings) - valueStrings := make([]string, 0, len(sampleSet.Values)) - for _, value := range sampleSet.Values { + valueStrings := make([]string, 0, len(sampleStream.Values)) + for _, value := range sampleStream.Values { valueStrings = append(valueStrings, fmt.Sprintf("\n%v @[%v]", value.Value, value.Timestamp)) } @@ -218,7 +219,7 @@ func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Stor case SCALAR: scalar := node.(ScalarNode).Eval(timestamp) evalTimer.Stop() - return Vector{&clientmodel.Sample{Value: scalar}}, nil + return Vector{&Sample{Value: scalar}}, nil case VECTOR: vector := node.(VectorNode).Eval(timestamp) evalTimer.Stop() @@ -228,8 +229,16 @@ func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Stor case STRING: str := node.(StringNode).Eval(timestamp) evalTimer.Stop() - return Vector{&clientmodel.Sample{ - Metric: clientmodel.Metric{"__value__": clientmodel.LabelValue(str)}}}, nil + return Vector{ + &Sample{ + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + "__value__": clientmodel.LabelValue(str), + }, + Copied: true, + }, + }, + }, nil } panic("Switch didn't cover all node types") } diff --git a/rules/helpers_test.go b/rules/helpers_test.go index aaf249056..2abf5adf6 100644 --- a/rules/helpers_test.go +++ b/rules/helpers_test.go @@ -41,10 +41,10 @@ func getTestValueStream(startVal clientmodel.SampleValue, endVal clientmodel.Sam func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { vector := ast.Vector{} - for _, sampleSet := range matrix { - lastSample := sampleSet.Values[len(sampleSet.Values)-1] - vector = append(vector, &clientmodel.Sample{ - Metric: sampleSet.Metric, + for _, sampleStream := range matrix { + lastSample := sampleStream.Values[len(sampleStream.Values)-1] + vector = append(vector, &ast.Sample{ + Metric: sampleStream.Metric, Value: lastSample.Value, Timestamp: lastSample.Timestamp, }) @@ -54,10 +54,10 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { func storeMatrix(storage local.Storage, matrix ast.Matrix) { pendingSamples := clientmodel.Samples{} - for _, sampleSet := range matrix { - for _, sample := range sampleSet.Values { + for _, sampleStream := range matrix { + for _, sample := range sampleStream.Values { pendingSamples = append(pendingSamples, &clientmodel.Sample{ - Metric: sampleSet.Metric, + Metric: sampleStream.Metric.Metric, Value: sample.Value, Timestamp: sample.Timestamp, }) @@ -69,96 +69,118 @@ func storeMatrix(storage local.Storage, matrix ast.Matrix) { var testMatrix = ast.Matrix{ { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "0", - "group": "production", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "api-server", + "instance": "0", + "group": "production", + }, }, Values: getTestValueStream(0, 100, 10, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "1", - "group": "production", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "api-server", + "instance": "1", + "group": "production", + }, }, Values: getTestValueStream(0, 200, 20, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "0", - "group": "canary", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "api-server", + "instance": "0", + "group": "canary", + }, }, Values: getTestValueStream(0, 300, 30, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "api-server", - "instance": "1", - "group": "canary", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "api-server", + "instance": "1", + "group": "canary", + }, }, Values: getTestValueStream(0, 400, 40, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "app-server", - "instance": "0", - "group": "production", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "app-server", + "instance": "0", + "group": "production", + }, }, Values: getTestValueStream(0, 500, 50, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "app-server", - "instance": "1", - "group": "production", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "app-server", + "instance": "1", + "group": "production", + }, }, Values: getTestValueStream(0, 600, 60, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "app-server", - "instance": "0", - "group": "canary", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "app-server", + "instance": "0", + "group": "canary", + }, }, Values: getTestValueStream(0, 700, 70, testStartTime), }, { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "http_requests", - clientmodel.JobLabel: "app-server", - "instance": "1", - "group": "canary", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "http_requests", + clientmodel.JobLabel: "app-server", + "instance": "1", + "group": "canary", + }, }, Values: getTestValueStream(0, 800, 80, testStartTime), }, // Single-letter metric and label names. { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "x", - "y": "testvalue", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "x", + "y": "testvalue", + }, }, Values: getTestValueStream(0, 100, 10, testStartTime), }, // Counter reset in the middle of range. { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "testcounter_reset_middle", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testcounter_reset_middle", + }, }, Values: append(getTestValueStream(0, 40, 10, testStartTime), getTestValueStream(0, 50, 10, testStartTime.Add(testSampleInterval*5))...), }, // Counter reset at the end of range. { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "testcounter_reset_end", + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testcounter_reset_end", + }, }, Values: append(getTestValueStream(0, 90, 10, testStartTime), getTestValueStream(0, 0, 10, testStartTime.Add(testSampleInterval*10))...), }, diff --git a/rules/manager/manager.go b/rules/manager/manager.go index 4e1d6957c..0c3324a65 100644 --- a/rules/manager/manager.go +++ b/rules/manager/manager.go @@ -222,7 +222,13 @@ func (m *ruleManager) runIteration(results chan<- *extraction.Result) { duration := time.Since(start) samples := make(clientmodel.Samples, len(vector)) - copy(samples, vector) + for i, s := range vector { + samples[i] = &clientmodel.Sample{ + Metric: s.Metric.Metric, + Value: s.Value, + Timestamp: s.Timestamp, + } + } m.results <- &extraction.Result{ Samples: samples, Err: err, diff --git a/rules/recording.go b/rules/recording.go index 59e869f7e..3f39ecdb9 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -50,12 +50,12 @@ func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage local.St // Override the metric name and labels. for _, sample := range vector { - sample.Metric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(rule.name) + sample.Metric.Set(clientmodel.MetricNameLabel, clientmodel.LabelValue(rule.name)) for label, value := range rule.labels { if value == "" { - delete(sample.Metric, label) + sample.Metric.Delete(label) } else { - sample.Metric[label] = value + sample.Metric.Set(label, value) } } } diff --git a/rules/rules_test.go b/rules/rules_test.go index 094965255..028da4006 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/storage/local" + "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility/test" ) @@ -60,7 +61,7 @@ func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) { func TestExpressions(t *testing.T) { // Labels in expected output need to be alphabetically sorted. - var expressionTests = []struct { + expressionTests := []struct { expr string output []string shouldFail bool @@ -705,6 +706,188 @@ func TestExpressions(t *testing.T) { } } +func TestRangedEvaluationRegressions(t *testing.T) { + scenarios := []struct { + in ast.Matrix + out ast.Matrix + expr string + }{ + { + // Testing COWMetric behavior in drop_common_labels. + in: ast.Matrix{ + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "testlabel": "1", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime, + Value: 1, + }, + { + Timestamp: testStartTime.Add(time.Hour), + Value: 1, + }, + }, + }, + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "testlabel": "2", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime.Add(time.Hour), + Value: 2, + }, + }, + }, + }, + out: ast.Matrix{ + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime, + Value: 1, + }, + }, + }, + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "testlabel": "1", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime.Add(time.Hour), + Value: 1, + }, + }, + }, + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "testlabel": "2", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime.Add(time.Hour), + Value: 2, + }, + }, + }, + }, + expr: "drop_common_labels(testmetric)", + }, + { + // Testing COWMetric behavior in vector aggregation. + in: ast.Matrix{ + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "testlabel": "1", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime, + Value: 1, + }, + { + Timestamp: testStartTime.Add(time.Hour), + Value: 1, + }, + }, + }, + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "testlabel": "2", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime, + Value: 2, + }, + }, + }, + }, + out: ast.Matrix{ + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{}, + }, + Values: metric.Values{ + { + Timestamp: testStartTime, + Value: 3, + }, + }, + }, + { + Metric: clientmodel.COWMetric{ + Metric: clientmodel.Metric{ + "testlabel": "1", + }, + }, + Values: metric.Values{ + { + Timestamp: testStartTime.Add(time.Hour), + Value: 1, + }, + }, + }, + }, + expr: "sum(testmetric) keeping_extra", + }, + } + + for i, s := range scenarios { + storage, closer := local.NewTestStorage(t) + storeMatrix(storage, s.in) + + expr, err := LoadExprFromString(s.expr) + if err != nil { + t.Fatalf("%d. Error parsing expression: %v", i, err) + } + + got, err := ast.EvalVectorRange( + expr.(ast.VectorNode), + testStartTime, + testStartTime.Add(time.Hour), + time.Hour, + storage, + stats.NewTimerGroup(), + ) + if err != nil { + t.Fatalf("%d. Error evaluating expression: %v", i, err) + } + + if got.String() != s.out.String() { + t.Fatalf("%d. Expression: %s\n\ngot:\n=====\n%v\n====\n\nwant:\n=====\n%v\n=====\n", i, s.expr, got.String(), s.out.String()) + } + + closer.Close() + } +} + var ruleTests = []struct { inputFile string shouldFail bool diff --git a/storage/local/interface.go b/storage/local/interface.go index 4b8b8e39e..733da1a76 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -14,9 +14,9 @@ package local import ( - "time" clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/client_golang/prometheus" + "time" "github.com/prometheus/prometheus/storage/metric" ) @@ -38,7 +38,7 @@ type Storage interface { // Get all of the label values that are associated with a given label name. GetLabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues // Get the metric associated with the provided fingerprint. - GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.Metric + GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric // Construct an iterator for a given fingerprint. NewIterator(clientmodel.Fingerprint) SeriesIterator // Run the various maintenance loops in goroutines. Returns when the diff --git a/storage/local/storage.go b/storage/local/storage.go index 7544e0149..0618ba57d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -323,25 +323,25 @@ func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.L } // GetMetricForFingerprint implements Storage. -func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric { +func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series, ok := s.fpToSeries.get(fp) if ok { - // Copy required here because caller might mutate the returned - // metric. - m := make(clientmodel.Metric, len(series.metric)) - for ln, lv := range series.metric { - m[ln] = lv + // Wrap the returned metric in a copy-on-write (COW) metric here because + // the caller might mutate it. + return clientmodel.COWMetric{ + Metric: series.metric, } - return m } metric, err := s.persistence.getArchivedMetric(fp) if err != nil { glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) } - return metric + return clientmodel.COWMetric{ + Metric: metric, + } } // AppendSamples implements Storage. diff --git a/storage/metric/sample.go b/storage/metric/sample.go index 2291fd743..532f0b900 100644 --- a/storage/metric/sample.go +++ b/storage/metric/sample.go @@ -47,12 +47,6 @@ func (s *SamplePair) String() string { // Values is a slice of SamplePairs. type Values []SamplePair -// SampleSet is Values with a Metric attached. -type SampleSet struct { - Metric clientmodel.Metric - Values Values -} - // Interval describes the inclusive interval between two Timestamps. type Interval struct { OldestInclusive clientmodel.Timestamp diff --git a/templates/templates.go b/templates/templates.go index 03e94e4e4..9b0928b31 100644 --- a/templates/templates.go +++ b/templates/templates.go @@ -76,7 +76,7 @@ func query(q string, timestamp clientmodel.Timestamp, storage local.Storage) (qu Value: float64(v.Value), Labels: make(map[string]string), } - for label, value := range v.Metric { + for label, value := range v.Metric.Metric { s.Labels[string(label)] = string(value) } result[n] = &s