mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Implement sum() for sparse histograms (#9948)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
4a43349aca
commit
187a767292
|
@ -2181,12 +2181,15 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64) (float64, bool) {
|
|||
}
|
||||
|
||||
type groupedAggregation struct {
|
||||
labels labels.Labels
|
||||
value float64
|
||||
mean float64
|
||||
groupCount int
|
||||
heap vectorByValueHeap
|
||||
reverseHeap vectorByReverseValueHeap
|
||||
hasFloat bool // Has at least 1 float64 sample aggregated.
|
||||
hasHistogram bool // Has at least 1 histogram sample aggregated.
|
||||
labels labels.Labels
|
||||
value float64
|
||||
histogramValue *histogram.FloatHistogram
|
||||
mean float64
|
||||
groupCount int
|
||||
heap vectorByValueHeap
|
||||
reverseHeap vectorByReverseValueHeap
|
||||
}
|
||||
|
||||
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
|
||||
|
@ -2268,6 +2271,12 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
|
|||
mean: s.V,
|
||||
groupCount: 1,
|
||||
}
|
||||
if s.H != nil {
|
||||
newAgg.histogramValue = s.H.Copy()
|
||||
newAgg.hasHistogram = true
|
||||
} else {
|
||||
newAgg.hasFloat = true
|
||||
}
|
||||
|
||||
result[groupingKey] = newAgg
|
||||
orderedResult = append(orderedResult, newAgg)
|
||||
|
@ -2302,7 +2311,13 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
|
|||
|
||||
switch op {
|
||||
case parser.SUM:
|
||||
group.value += s.V
|
||||
if s.H != nil {
|
||||
group.hasHistogram = true
|
||||
group.histogramValue.Add(s.H)
|
||||
} else {
|
||||
group.hasFloat = true
|
||||
group.value += s.V
|
||||
}
|
||||
|
||||
case parser.AVG:
|
||||
group.groupCount++
|
||||
|
@ -2436,13 +2451,18 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
|
|||
case parser.QUANTILE:
|
||||
aggr.value = quantile(q, aggr.heap)
|
||||
|
||||
case parser.SUM:
|
||||
if aggr.hasFloat && aggr.hasHistogram {
|
||||
// We cannot aggregate histogram sample with a float64 sample.
|
||||
continue
|
||||
}
|
||||
default:
|
||||
// For other aggregations, we already have the right value.
|
||||
}
|
||||
|
||||
enh.Out = append(enh.Out, Sample{
|
||||
Metric: aggr.labels,
|
||||
Point: Point{V: aggr.value},
|
||||
Point: Point{V: aggr.value, H: aggr.histogramValue},
|
||||
})
|
||||
}
|
||||
return enh.Out
|
||||
|
|
|
@ -2899,3 +2899,125 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSparseHistogram_Sum(t *testing.T) {
|
||||
// TODO(codesome): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
cases := []struct {
|
||||
histograms []histogram.Histogram
|
||||
expected histogram.FloatHistogram
|
||||
}{
|
||||
{
|
||||
histograms: []histogram.Histogram{
|
||||
{
|
||||
Schema: 0,
|
||||
Count: 9,
|
||||
Sum: 1234.5,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 4,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 1, -1, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 2},
|
||||
},
|
||||
NegativeBuckets: []int64{2, 2, -3, 8},
|
||||
},
|
||||
{
|
||||
Schema: 0,
|
||||
Count: 15,
|
||||
Sum: 2345.6,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
{Offset: 0, Length: 3},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 2, Length: 0},
|
||||
{Offset: 2, Length: 3},
|
||||
},
|
||||
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
|
||||
},
|
||||
{
|
||||
Schema: 0,
|
||||
Count: 15,
|
||||
Sum: 1111.1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
{Offset: 0, Length: 3},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 2, Length: 0},
|
||||
{Offset: 2, Length: 3},
|
||||
},
|
||||
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
|
||||
},
|
||||
},
|
||||
expected: histogram.FloatHistogram{
|
||||
Schema: 0,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 14,
|
||||
Count: 39,
|
||||
Sum: 4691.2,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 0, Length: 4},
|
||||
},
|
||||
PositiveBuckets: []float64{3, 8, 2, 5, 3, 2, 2},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
NegativeBuckets: []float64{2, 6, 8, 4, 15, 9, 10, 10, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||
test, err := NewTest(t, "")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(test.Close)
|
||||
|
||||
seriesName := "sparse_histogram_series"
|
||||
|
||||
engine := test.QueryEngine()
|
||||
|
||||
ts := int64(i+1) * int64(10*time.Minute/time.Millisecond)
|
||||
app := test.Storage().Appender(context.TODO())
|
||||
for idx, h := range c.histograms {
|
||||
// TODO(codesome): Without h.Copy(), it was working weird. TSDB is keeping reference of last histogram. AppendHistogram should not take pointers!
|
||||
lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx))
|
||||
_, err = app.AppendHistogram(0, lbls, ts, h.Copy())
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
queryString := fmt.Sprintf("sum(%s)", seriesName)
|
||||
qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(ts))
|
||||
require.NoError(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
vector, err := res.Vector()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, vector, 1)
|
||||
require.Equal(t, &c.expected, vector[0].H)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue