mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #546 from prometheus/fix-label-grouping
Fix aggregation grouping key calculation.
This commit is contained in:
commit
452c88964a
|
@ -19,7 +19,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
@ -375,28 +374,8 @@ func (node *ScalarFunctionCall) Eval(timestamp clientmodel.Timestamp) clientmode
|
||||||
func (node *VectorAggregation) labelsToGroupingKey(labels clientmodel.Metric) uint64 {
|
func (node *VectorAggregation) labelsToGroupingKey(labels clientmodel.Metric) uint64 {
|
||||||
summer := fnv.New64a()
|
summer := fnv.New64a()
|
||||||
for _, label := range node.groupBy {
|
for _, label := range node.groupBy {
|
||||||
fmt.Fprint(summer, labels[label])
|
summer.Write([]byte(labels[label]))
|
||||||
}
|
summer.Write([]byte{clientmodel.SeparatorByte})
|
||||||
|
|
||||||
return summer.Sum64()
|
|
||||||
}
|
|
||||||
|
|
||||||
func labelsToKey(labels clientmodel.Metric) uint64 {
|
|
||||||
pairs := metric.LabelPairs{}
|
|
||||||
|
|
||||||
for label, value := range labels {
|
|
||||||
pairs = append(pairs, &metric.LabelPair{
|
|
||||||
Name: label,
|
|
||||||
Value: value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Sort(pairs)
|
|
||||||
|
|
||||||
summer := fnv.New64a()
|
|
||||||
|
|
||||||
for _, pair := range pairs {
|
|
||||||
fmt.Fprint(summer, pair.Name, pair.Value)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return summer.Sum64()
|
return summer.Sum64()
|
||||||
|
@ -435,7 +414,7 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
|
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
|
||||||
sampleStreams := map[uint64]*SampleStream{}
|
sampleStreams := map[clientmodel.Fingerprint]*SampleStream{}
|
||||||
for t := start; !t.After(end); t = t.Add(interval) {
|
for t := start; !t.After(end); t = t.Add(interval) {
|
||||||
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
|
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
evalTimer.Stop()
|
evalTimer.Stop()
|
||||||
|
@ -447,14 +426,14 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod
|
||||||
Value: sample.Value,
|
Value: sample.Value,
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
}
|
}
|
||||||
groupingKey := labelsToKey(sample.Metric.Metric)
|
fp := sample.Metric.Metric.Fingerprint()
|
||||||
if sampleStreams[groupingKey] == nil {
|
if sampleStreams[fp] == nil {
|
||||||
sampleStreams[groupingKey] = &SampleStream{
|
sampleStreams[fp] = &SampleStream{
|
||||||
Metric: sample.Metric,
|
Metric: sample.Metric,
|
||||||
Values: metric.Values{samplePair},
|
Values: metric.Values{samplePair},
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sampleStreams[groupingKey].Values = append(sampleStreams[groupingKey].Values, samplePair)
|
sampleStreams[fp].Values = append(sampleStreams[fp].Values, samplePair)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,6 +184,27 @@ var testMatrix = ast.Matrix{
|
||||||
},
|
},
|
||||||
Values: append(getTestValueStream(0, 90, 10, testStartTime), getTestValueStream(0, 0, 10, testStartTime.Add(testSampleInterval*10))...),
|
Values: append(getTestValueStream(0, 90, 10, testStartTime), getTestValueStream(0, 0, 10, testStartTime.Add(testSampleInterval*10))...),
|
||||||
},
|
},
|
||||||
|
// For label-key grouping regression test.
|
||||||
|
{
|
||||||
|
Metric: clientmodel.COWMetric{
|
||||||
|
Metric: clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "label_grouping_test",
|
||||||
|
"a": "aa",
|
||||||
|
"b": "bb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: getTestValueStream(0, 100, 10, testStartTime),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: clientmodel.COWMetric{
|
||||||
|
Metric: clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "label_grouping_test",
|
||||||
|
"a": "a",
|
||||||
|
"b": "abb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: getTestValueStream(0, 200, 20, testStartTime),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var testVector = getTestVectorFromTestMatrix(testMatrix)
|
var testVector = getTestVectorFromTestMatrix(testMatrix)
|
||||||
|
|
|
@ -553,6 +553,8 @@ func TestExpressions(t *testing.T) {
|
||||||
`testcounter_reset_end => 0 @[%v]`,
|
`testcounter_reset_end => 0 @[%v]`,
|
||||||
`testcounter_reset_middle => 50 @[%v]`,
|
`testcounter_reset_middle => 50 @[%v]`,
|
||||||
`x{y="testvalue"} => 100 @[%v]`,
|
`x{y="testvalue"} => 100 @[%v]`,
|
||||||
|
`label_grouping_test{a="a", b="abb"} => 200 @[%v]`,
|
||||||
|
`label_grouping_test{a="aa", b="bb"} => 100 @[%v]`,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -641,6 +643,14 @@ func TestExpressions(t *testing.T) {
|
||||||
expr: `sum(http_requests) offset 5m`,
|
expr: `sum(http_requests) offset 5m`,
|
||||||
shouldFail: true,
|
shouldFail: true,
|
||||||
},
|
},
|
||||||
|
// Regression test for missing separator byte in labelsToGroupingKey.
|
||||||
|
{
|
||||||
|
expr: `sum(label_grouping_test) by (a, b)`,
|
||||||
|
output: []string{
|
||||||
|
`{a="a", b="abb"} => 200 @[%v]`,
|
||||||
|
`{a="aa", b="bb"} => 100 @[%v]`,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
storage, closer := newTestStorage(t)
|
storage, closer := newTestStorage(t)
|
||||||
|
@ -850,6 +860,70 @@ func TestRangedEvaluationRegressions(t *testing.T) {
|
||||||
},
|
},
|
||||||
expr: "sum(testmetric) keeping_extra",
|
expr: "sum(testmetric) keeping_extra",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// Testing metric fingerprint grouping behavior.
|
||||||
|
in: ast.Matrix{
|
||||||
|
{
|
||||||
|
Metric: clientmodel.COWMetric{
|
||||||
|
Metric: clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "testmetric",
|
||||||
|
"aa": "bb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: metric.Values{
|
||||||
|
{
|
||||||
|
Timestamp: testStartTime,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: clientmodel.COWMetric{
|
||||||
|
Metric: clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "testmetric",
|
||||||
|
"a": "abb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: metric.Values{
|
||||||
|
{
|
||||||
|
Timestamp: testStartTime,
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
out: ast.Matrix{
|
||||||
|
{
|
||||||
|
Metric: clientmodel.COWMetric{
|
||||||
|
Metric: clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "testmetric",
|
||||||
|
"aa": "bb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: metric.Values{
|
||||||
|
{
|
||||||
|
Timestamp: testStartTime,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Metric: clientmodel.COWMetric{
|
||||||
|
Metric: clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "testmetric",
|
||||||
|
"a": "abb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: metric.Values{
|
||||||
|
{
|
||||||
|
Timestamp: testStartTime,
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expr: "testmetric",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, s := range scenarios {
|
for i, s := range scenarios {
|
||||||
|
|
Loading…
Reference in a new issue