Handle more arithmetic operators for native histograms (#12262)

Handle more arithmetic operators and aggregators for native histograms

This includes operators for multiplication (formerly known as scaling), division, and subtraction. Plus aggregations for average and the avg_over_time function.

Stdvar and stddev will (for now) ignore histograms properly (rather than counting them but adding a 0 for them).

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
zenador 2023-05-17 03:15:20 +08:00 committed by GitHub
parent 30e263cf96
commit 191bf9055b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 803 additions and 100 deletions

View file

@ -589,8 +589,9 @@ over time and return an instant vector with per-series aggregation results:
Note that all values in the specified interval have the same weight in the
aggregation even if the values are not equally spaced throughout the interval.
`count_over_time`, `last_over_time`, and `present_over_time` handle native
histograms as expected. All other functions ignore histogram samples.
`avg_over_time`, `sum_over_time`, `count_over_time`, `last_over_time`, and
`present_over_time` handle native histograms as expected. All other functions
ignore histogram samples.
## Trigonometric Functions

View file

@ -318,19 +318,23 @@ histograms is still very limited.
Logical/set binary operators work as expected even if histogram samples are
involved. They only check for the existence of a vector element and don't
change their behavior depending on the sample type of an element (float or
histogram).
histogram). The `count` aggregation operator works similarly.
The binary `+` operator between two native histograms and the `sum` aggregation
operator to aggregate native histograms are fully supported. Even if the
histograms involved have different bucket layouts, the buckets are
automatically converted appropriately so that the operation can be
The binary `+` and `-` operators between two native histograms and the `sum`
and `avg` aggregation operators to aggregate native histograms are fully
supported. Even if the histograms involved have different bucket layouts, the
buckets are automatically converted appropriately so that the operation can be
performed. (With the currently supported bucket schemas, that's always
possible.) If either operator has to sum up a mix of histogram samples and
possible.) If either operator has to aggregate a mix of histogram samples and
float samples, the corresponding vector element is removed from the output
vector entirely.
All other operators do not behave in a meaningful way. They either treat the
histogram sample as if it were a float sample of value 0, or (in case of
arithmetic operations between a scalar and a vector) they leave the histogram
sample unchanged. This behavior will change to a meaningful one before native
histograms are a stable feature.
The binary `*` operator works between a native histogram and a float in any
order, while the binary `/` operator can be used between a native histogram
and a float in that exact order.
All other operators (and unmentioned cases for the above operators) do not
behave in a meaningful way. They either treat the histogram sample as if it
were a float sample of value 0, or (in case of arithmetic operations between a
scalar and a vector) they leave the histogram sample unchanged. This behavior
will change to a meaningful one before native histograms are a stable feature.

View file

@ -159,12 +159,12 @@ func (h *FloatHistogram) ZeroBucket() Bucket[float64] {
}
}
// Scale scales the FloatHistogram by the provided factor, i.e. it scales all
// Mul multiplies the FloatHistogram by the provided factor, i.e. it scales all
// bucket counts including the zero bucket and the count and the sum of
// observations. The bucket layout stays the same. This method changes the
// receiving histogram directly (rather than acting on a copy). It returns a
// pointer to the receiving histogram for convenience.
func (h *FloatHistogram) Scale(factor float64) *FloatHistogram {
func (h *FloatHistogram) Mul(factor float64) *FloatHistogram {
h.ZeroCount *= factor
h.Count *= factor
h.Sum *= factor
@ -177,6 +177,21 @@ func (h *FloatHistogram) Scale(factor float64) *FloatHistogram {
return h
}
// Div works like Scale but divides instead of multiplies.
// When dividing by 0, everything will be set to Inf.
func (h *FloatHistogram) Div(scalar float64) *FloatHistogram {
h.ZeroCount /= scalar
h.Count /= scalar
h.Sum /= scalar
for i := range h.PositiveBuckets {
h.PositiveBuckets[i] /= scalar
}
for i := range h.NegativeBuckets {
h.NegativeBuckets[i] /= scalar
}
return h
}
// Add adds the provided other histogram to the receiving histogram. Count, Sum,
// and buckets from the other histogram are added to the corresponding
// components of the receiving histogram. Buckets in the other histogram that do

View file

@ -15,12 +15,13 @@ package histogram
import (
"fmt"
"math"
"testing"
"github.com/stretchr/testify/require"
)
func TestFloatHistogramScale(t *testing.T) {
func TestFloatHistogramMul(t *testing.T) {
cases := []struct {
name string
in *FloatHistogram
@ -33,6 +34,30 @@ func TestFloatHistogramScale(t *testing.T) {
3.1415,
&FloatHistogram{},
},
{
"zero multiplier",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
0,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 0,
Count: 0,
Sum: 0,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{0, 0, 0, 0},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{0, 0, 0, 0},
},
},
{
"no-op",
&FloatHistogram{
@ -81,17 +106,137 @@ func TestFloatHistogramScale(t *testing.T) {
NegativeBuckets: []float64{6.2, 6, 1.234e5 * 2, 2000},
},
},
{
"triple",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 23,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
3,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 33,
Count: 90,
Sum: 69,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{3, 0, 9, 12, 21},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{9, 3, 15, 18},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.in.Scale(c.scale))
require.Equal(t, c.expected, c.in.Mul(c.scale))
// Has it also happened in-place?
require.Equal(t, c.expected, c.in)
})
}
}
func TestFloatHistogramDiv(t *testing.T) {
cases := []struct {
name string
fh *FloatHistogram
s float64
expected *FloatHistogram
}{
{
"zero value",
&FloatHistogram{},
3.1415,
&FloatHistogram{},
},
{
"zero divisor",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
0,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: math.Inf(1),
Count: math.Inf(1),
Sum: math.Inf(1),
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{math.Inf(1), math.Inf(1), math.Inf(1), math.Inf(1)},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{math.Inf(1), math.Inf(1), math.Inf(1), math.Inf(1)},
},
},
{
"no-op",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
1,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
},
{
"half",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 23,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
2,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 15,
Sum: 11.5,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{0.5, 0, 1.5, 2, 3.5},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{1.5, 0.5, 2.5, 3},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.fh.Div(c.s))
// Has it also happened in-place?
require.Equal(t, c.expected, c.fh)
})
}
}
func TestFloatHistogramDetectReset(t *testing.T) {
cases := []struct {
name string

View file

@ -2263,14 +2263,11 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
insertedSigs[insertSig] = struct{}{}
}
if (hl != nil && hr != nil) || (hl == nil && hr == nil) {
// Both lhs and rhs are of same type.
enh.Out = append(enh.Out, Sample{
Metric: metric,
F: floatValue,
H: histogramValue,
})
}
enh.Out = append(enh.Out, Sample{
Metric: metric,
F: floatValue,
H: histogramValue,
})
}
return enh.Out
}
@ -2337,28 +2334,33 @@ func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.V
// VectorscalarBinop evaluates a binary operation between a Vector and a Scalar.
func (ev *evaluator) VectorscalarBinop(op parser.ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper) Vector {
for _, lhsSample := range lhs {
lv, rv := lhsSample.F, rhs.V
lf, rf := lhsSample.F, rhs.V
var rh *histogram.FloatHistogram
lh := lhsSample.H
// lhs always contains the Vector. If the original position was different
// swap for calculating the value.
if swap {
lv, rv = rv, lv
lf, rf = rf, lf
lh, rh = rh, lh
}
value, _, keep := vectorElemBinop(op, lv, rv, nil, nil)
float, histogram, keep := vectorElemBinop(op, lf, rf, lh, rh)
// Catch cases where the scalar is the LHS in a scalar-vector comparison operation.
// We want to always keep the vector element value as the output value, even if it's on the RHS.
if op.IsComparisonOperator() && swap {
value = rv
float = rf
histogram = rh
}
if returnBool {
if keep {
value = 1.0
float = 1.0
} else {
value = 0.0
float = 0.0
}
keep = true
}
if keep {
lhsSample.F = value
lhsSample.F = float
lhsSample.H = histogram
if shouldDropMetricName(op) || returnBool {
lhsSample.Metric = enh.DropMetricName(lhsSample.Metric)
}
@ -2413,16 +2415,33 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram
// The histogram being added must have the larger schema
// code (i.e. the higher resolution).
if hrhs.Schema >= hlhs.Schema {
return 0, hlhs.Copy().Add(hrhs), true
return 0, hlhs.Copy().Add(hrhs).Compact(0), true
}
return 0, hrhs.Copy().Add(hlhs), true
return 0, hrhs.Copy().Add(hlhs).Compact(0), true
}
return lhs + rhs, nil, true
case parser.SUB:
if hlhs != nil && hrhs != nil {
// The histogram being subtracted must have the larger schema
// code (i.e. the higher resolution).
if hrhs.Schema >= hlhs.Schema {
return 0, hlhs.Copy().Sub(hrhs).Compact(0), true
}
return 0, hrhs.Copy().Mul(-1).Add(hlhs).Compact(0), true
}
return lhs - rhs, nil, true
case parser.MUL:
if hlhs != nil && hrhs == nil {
return 0, hlhs.Copy().Mul(rhs), true
}
if hlhs == nil && hrhs != nil {
return 0, hrhs.Copy().Mul(lhs), true
}
return lhs * rhs, nil, true
case parser.DIV:
if hlhs != nil && hrhs == nil {
return 0, hlhs.Copy().Div(rhs), true
}
return lhs / rhs, nil, true
case parser.POW:
return math.Pow(lhs, rhs), nil, true
@ -2452,7 +2471,8 @@ type groupedAggregation struct {
labels labels.Labels
floatValue float64
histogramValue *histogram.FloatHistogram
mean float64
floatMean float64
histogramMean *histogram.FloatHistogram
groupCount int
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
@ -2536,7 +2556,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
newAgg := &groupedAggregation{
labels: m,
floatValue: s.F,
mean: s.F,
floatMean: s.F,
groupCount: 1,
}
switch {
@ -2545,6 +2565,11 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case op == parser.SUM:
newAgg.histogramValue = s.H.Copy()
newAgg.hasHistogram = true
case op == parser.AVG:
newAgg.histogramMean = s.H.Copy()
newAgg.hasHistogram = true
case op == parser.STDVAR || op == parser.STDDEV:
newAgg.groupCount = 0
}
result[groupingKey] = newAgg
@ -2589,9 +2614,7 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
if s.H.Schema >= group.histogramValue.Schema {
group.histogramValue.Add(s.H)
} else {
h := s.H.Copy()
h.Add(group.histogramValue)
group.histogramValue = h
group.histogramValue = s.H.Copy().Add(group.histogramValue)
}
}
// Otherwise the aggregation contained floats
@ -2604,25 +2627,46 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.AVG:
group.groupCount++
if math.IsInf(group.mean, 0) {
if math.IsInf(s.F, 0) && (group.mean > 0) == (s.F > 0) {
// The `mean` and `s.V` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `mean` is correct
// already.
break
if s.H != nil {
group.hasHistogram = true
if group.histogramMean != nil {
left := s.H.Copy().Div(float64(group.groupCount))
right := group.histogramMean.Copy().Div(float64(group.groupCount))
// The histogram being added/subtracted must have
// an equal or larger schema.
if s.H.Schema >= group.histogramMean.Schema {
toAdd := right.Mul(-1).Add(left)
group.histogramMean.Add(toAdd)
} else {
toAdd := left.Sub(right)
group.histogramMean = toAdd.Add(group.histogramMean)
}
}
if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
break
// Otherwise the aggregation contained floats
// previously and will be invalid anyway. No
// point in copying the histogram in that case.
} else {
group.hasFloat = true
if math.IsInf(group.floatMean, 0) {
if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct
// already.
break
}
if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
break
}
}
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
}
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
group.mean += s.F/float64(group.groupCount) - group.mean/float64(group.groupCount)
case parser.GROUP:
// Do nothing. Required to avoid the panic in `default:` below.
@ -2641,10 +2685,12 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
group.groupCount++
case parser.STDVAR, parser.STDDEV:
group.groupCount++
delta := s.F - group.mean
group.mean += delta / float64(group.groupCount)
group.floatValue += delta * (s.F - group.mean)
if s.H == nil { // Ignore native histograms.
group.groupCount++
delta := s.F - group.floatMean
group.floatMean += delta / float64(group.groupCount)
group.floatValue += delta * (s.F - group.floatMean)
}
case parser.TOPK:
// We build a heap of up to k elements, with the smallest element at heap[0].
@ -2696,7 +2742,16 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
for _, aggr := range orderedResult {
switch op {
case parser.AVG:
aggr.floatValue = aggr.mean
if aggr.hasFloat && aggr.hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
// TODO(zenador): Issue warning when plumbing is in place.
continue
}
if aggr.hasHistogram {
aggr.histogramValue = aggr.histogramMean.Compact(0)
} else {
aggr.floatValue = aggr.floatMean
}
case parser.COUNT, parser.COUNT_VALUES:
aggr.floatValue = float64(aggr.groupCount)
@ -2739,8 +2794,12 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.SUM:
if aggr.hasFloat && aggr.hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
// TODO(zenador): Issue warning when plumbing is in place.
continue
}
if aggr.hasHistogram {
aggr.histogramValue.Compact(0)
}
default:
// For other aggregations, we already have the right value.
}

View file

@ -3966,21 +3966,23 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) {
}
}
func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
cases := []struct {
histograms []histogram.Histogram
expected histogram.FloatHistogram
histograms []histogram.Histogram
expected histogram.FloatHistogram
expectedAvg histogram.FloatHistogram
}{
{
histograms: []histogram.Histogram{
{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
@ -3992,6 +3994,182 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
},
NegativeBuckets: []int64{2, 2, -3, 8},
},
{
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 36,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
{
CounterResetHint: histogram.GaugeType,
Schema: 0,
Count: 36,
Sum: 1111.1,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
{
CounterResetHint: histogram.GaugeType,
Schema: 1, // Everything is 0 just to make the count 4 so avg has nicer numbers.
},
},
expected: histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 0,
ZeroThreshold: 0.001,
ZeroCount: 14,
Count: 93,
Sum: 4691.2,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 7},
},
PositiveBuckets: []float64{3, 8, 2, 5, 3, 2, 2},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 6},
{Offset: 3, Length: 3},
},
NegativeBuckets: []float64{2, 6, 8, 4, 15, 9, 10, 10, 4},
},
expectedAvg: histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 0,
ZeroThreshold: 0.001,
ZeroCount: 3.5,
Count: 23.25,
Sum: 1172.8,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 7},
},
PositiveBuckets: []float64{0.75, 2, 0.5, 1.25, 0.75, 0.5, 0.5},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 6},
{Offset: 3, Length: 3},
},
NegativeBuckets: []float64{0.5, 1.5, 2, 1, 3.75, 2.25, 2.5, 2.5, 1},
},
},
}
idx0 := int64(0)
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
test, err := NewTest(t, "")
require.NoError(t, err)
t.Cleanup(test.Close)
seriesName := "sparse_histogram_series"
seriesNameOverTime := "sparse_histogram_series_over_time"
engine := test.QueryEngine()
ts := idx0 * int64(10*time.Minute/time.Millisecond)
app := test.Storage().Appender(context.TODO())
for idx1, h := range c.histograms {
lbls := labels.FromStrings("__name__", seriesName, "idx", fmt.Sprintf("%d", idx1))
// Since we mutate h later, we need to create a copy here.
if floatHisto {
_, err = app.AppendHistogram(0, lbls, ts, nil, h.Copy().ToFloat())
} else {
_, err = app.AppendHistogram(0, lbls, ts, h.Copy(), nil)
}
require.NoError(t, err)
lbls = labels.FromStrings("__name__", seriesNameOverTime)
newTs := ts + int64(idx1)*int64(time.Minute/time.Millisecond)
// Since we mutate h later, we need to create a copy here.
if floatHisto {
_, err = app.AppendHistogram(0, lbls, newTs, nil, h.Copy().ToFloat())
} else {
_, err = app.AppendHistogram(0, lbls, newTs, h.Copy(), nil)
}
require.NoError(t, err)
}
require.NoError(t, app.Commit())
queryAndCheck := func(queryString string, ts int64, exp Vector) {
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
vector, err := res.Vector()
require.NoError(t, err)
require.Equal(t, exp, vector)
}
// sum().
queryString := fmt.Sprintf("sum(%s)", seriesName)
queryAndCheck(queryString, ts, []Sample{{T: ts, H: &c.expected, Metric: labels.EmptyLabels()}})
// + operator.
queryString = fmt.Sprintf(`%s{idx="0"}`, seriesName)
for idx := 1; idx < len(c.histograms); idx++ {
queryString += fmt.Sprintf(` + ignoring(idx) %s{idx="%d"}`, seriesName, idx)
}
queryAndCheck(queryString, ts, []Sample{{T: ts, H: &c.expected, Metric: labels.EmptyLabels()}})
// count().
queryString = fmt.Sprintf("count(%s)", seriesName)
queryAndCheck(queryString, ts, []Sample{{T: ts, F: 4, Metric: labels.EmptyLabels()}})
// avg().
queryString = fmt.Sprintf("avg(%s)", seriesName)
queryAndCheck(queryString, ts, []Sample{{T: ts, H: &c.expectedAvg, Metric: labels.EmptyLabels()}})
offset := int64(len(c.histograms) - 1)
newTs := ts + offset*int64(time.Minute/time.Millisecond)
// sum_over_time().
queryString = fmt.Sprintf("sum_over_time(%s[%dm:1m])", seriesNameOverTime, offset)
queryAndCheck(queryString, newTs, []Sample{{T: newTs, H: &c.expected, Metric: labels.EmptyLabels()}})
// avg_over_time().
queryString = fmt.Sprintf("avg_over_time(%s[%dm:1m])", seriesNameOverTime, offset)
queryAndCheck(queryString, newTs, []Sample{{T: newTs, H: &c.expectedAvg, Metric: labels.EmptyLabels()}})
})
idx0++
}
}
}
func TestNativeHistogram_SubOperator(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
cases := []struct {
histograms []histogram.Histogram
expected histogram.FloatHistogram
}{
{
histograms: []histogram.Histogram{
{
Schema: 0,
Count: 36,
@ -4011,10 +4189,117 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
{
Schema: 0,
Count: 11,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 3,
PositiveSpans: []histogram.Span{
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, -1},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
NegativeBuckets: []int64{3, -1},
},
},
expected: histogram.FloatHistogram{
Schema: 0,
Count: 25,
Sum: 1111.1,
ZeroThreshold: 0.001,
ZeroCount: 2,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 4},
},
PositiveBuckets: []float64{1, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 2},
{Offset: 1, Length: 1},
{Offset: 4, Length: 3},
},
NegativeBuckets: []float64{1, 1, 7, 5, 5, 2},
},
},
{
histograms: []histogram.Histogram{
{
Schema: 0,
Count: 36,
Sum: 1111.1,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
{
Schema: 1,
Count: 11,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 3,
PositiveSpans: []histogram.Span{
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, -1},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
NegativeBuckets: []int64{3, -1},
},
},
expected: histogram.FloatHistogram{
Schema: 0,
Count: 25,
Sum: 1111.1,
ZeroThreshold: 0.001,
ZeroCount: 2,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 1, Length: 5},
},
PositiveBuckets: []float64{1, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 4, Length: 3},
},
NegativeBuckets: []float64{-2, 2, 2, 7, 5, 5, 2},
},
},
{
histograms: []histogram.Histogram{
{
Schema: 1,
Count: 11,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 3,
PositiveSpans: []histogram.Span{
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, -1},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
NegativeBuckets: []int64{3, -1},
},
{
Schema: 0,
Count: 36,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
@ -4033,21 +4318,20 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
},
expected: histogram.FloatHistogram{
Schema: 0,
Count: -25,
Sum: -1111.1,
ZeroThreshold: 0.001,
ZeroCount: 14,
Count: 93,
Sum: 4691.2,
ZeroCount: -2,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 0, Length: 4},
{Offset: 0, Length: 1},
{Offset: 1, Length: 5},
},
PositiveBuckets: []float64{3, 8, 2, 5, 3, 2, 2},
PositiveBuckets: []float64{-1, -1, -2, -1, -1, -1},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 2},
{Offset: 3, Length: 3},
{Offset: 1, Length: 4},
{Offset: 4, Length: 3},
},
NegativeBuckets: []float64{2, 6, 8, 4, 15, 9, 10, 10, 4},
NegativeBuckets: []float64{2, -2, -2, -7, -5, -5, -2},
},
},
}
@ -4091,20 +4375,177 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
require.Equal(t, exp, vector)
}
// sum().
queryString := fmt.Sprintf("sum(%s)", seriesName)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expected, Metric: labels.EmptyLabels()}})
// + operator.
queryString = fmt.Sprintf(`%s{idx="0"}`, seriesName)
// - operator.
queryString := fmt.Sprintf(`%s{idx="0"}`, seriesName)
for idx := 1; idx < len(c.histograms); idx++ {
queryString += fmt.Sprintf(` + ignoring(idx) %s{idx="%d"}`, seriesName, idx)
queryString += fmt.Sprintf(` - ignoring(idx) %s{idx="%d"}`, seriesName, idx)
}
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expected, Metric: labels.EmptyLabels()}})
})
idx0++
}
}
}
// count().
queryString = fmt.Sprintf("count(%s)", seriesName)
queryAndCheck(queryString, []Sample{{T: ts, F: 3, Metric: labels.EmptyLabels()}})
func TestNativeHistogram_MulDivOperator(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
originalHistogram := histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 33,
ZeroThreshold: 0.001,
ZeroCount: 3,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{3, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []int64{3, 0, 0},
}
cases := []struct {
scalar float64
histogram histogram.Histogram
expectedMul histogram.FloatHistogram
expectedDiv histogram.FloatHistogram
}{
{
scalar: 3,
histogram: originalHistogram,
expectedMul: histogram.FloatHistogram{
Schema: 0,
Count: 63,
Sum: 99,
ZeroThreshold: 0.001,
ZeroCount: 9,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{9, 9, 9},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []float64{9, 9, 9},
},
expectedDiv: histogram.FloatHistogram{
Schema: 0,
Count: 7,
Sum: 11,
ZeroThreshold: 0.001,
ZeroCount: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []float64{1, 1, 1},
},
},
{
scalar: 0,
histogram: originalHistogram,
expectedMul: histogram.FloatHistogram{
Schema: 0,
Count: 0,
Sum: 0,
ZeroThreshold: 0.001,
ZeroCount: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{0, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []float64{0, 0, 0},
},
expectedDiv: histogram.FloatHistogram{
Schema: 0,
Count: math.Inf(1),
Sum: math.Inf(1),
ZeroThreshold: 0.001,
ZeroCount: math.Inf(1),
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{math.Inf(1), math.Inf(1), math.Inf(1)},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 3},
},
NegativeBuckets: []float64{math.Inf(1), math.Inf(1), math.Inf(1)},
},
},
}
idx0 := int64(0)
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
test, err := NewTest(t, "")
require.NoError(t, err)
t.Cleanup(test.Close)
seriesName := "sparse_histogram_series"
floatSeriesName := "float_series"
engine := test.QueryEngine()
ts := idx0 * int64(10*time.Minute/time.Millisecond)
app := test.Storage().Appender(context.TODO())
h := c.histogram
lbls := labels.FromStrings("__name__", seriesName)
// Since we mutate h later, we need to create a copy here.
if floatHisto {
_, err = app.AppendHistogram(0, lbls, ts, nil, h.Copy().ToFloat())
} else {
_, err = app.AppendHistogram(0, lbls, ts, h.Copy(), nil)
}
require.NoError(t, err)
_, err = app.Append(0, labels.FromStrings("__name__", floatSeriesName), ts, c.scalar)
require.NoError(t, err)
require.NoError(t, app.Commit())
queryAndCheck := func(queryString string, exp Vector) {
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
vector, err := res.Vector()
require.NoError(t, err)
require.Equal(t, exp, vector)
}
// histogram * scalar.
queryString := fmt.Sprintf(`%s * %f`, seriesName, c.scalar)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expectedMul, Metric: labels.EmptyLabels()}})
// scalar * histogram.
queryString = fmt.Sprintf(`%f * %s`, c.scalar, seriesName)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expectedMul, Metric: labels.EmptyLabels()}})
// histogram * float.
queryString = fmt.Sprintf(`%s * %s`, seriesName, floatSeriesName)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expectedMul, Metric: labels.EmptyLabels()}})
// float * histogram.
queryString = fmt.Sprintf(`%s * %s`, floatSeriesName, seriesName)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expectedMul, Metric: labels.EmptyLabels()}})
// histogram / scalar.
queryString = fmt.Sprintf(`%s / %f`, seriesName, c.scalar)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expectedDiv, Metric: labels.EmptyLabels()}})
// histogram / float.
queryString = fmt.Sprintf(`%s / %s`, seriesName, floatSeriesName)
queryAndCheck(queryString, []Sample{{T: ts, H: &c.expectedDiv, Metric: labels.EmptyLabels()}})
})
idx0++
}

View file

@ -162,7 +162,7 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
if resultHistogram == nil {
resultFloat *= factor
} else {
resultHistogram.Scale(factor)
resultHistogram.Mul(factor)
}
return append(enh.Out, Sample{F: resultFloat, H: resultHistogram})
@ -443,15 +443,40 @@ func aggrOverTime(vals []parser.Value, enh *EvalNodeHelper, aggrFn func(Series)
return append(enh.Out, Sample{F: aggrFn(el)})
}
func aggrHistOverTime(vals []parser.Value, enh *EvalNodeHelper, aggrFn func(Series) *histogram.FloatHistogram) Vector {
el := vals[0].(Matrix)[0]
return append(enh.Out, Sample{H: aggrFn(el)})
}
// === avg_over_time(Matrix parser.ValueTypeMatrix) Vector ===
func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
if len(vals[0].(Matrix)[0].Floats) == 0 {
// TODO(beorn7): The passed values only contain
// histograms. avg_over_time ignores histograms for now. If
// there are only histograms, we have to return without adding
// anything to enh.Out.
if len(vals[0].(Matrix)[0].Floats) > 0 && len(vals[0].(Matrix)[0].Histograms) > 0 {
// TODO(zenador): Add warning for mixed floats and histograms.
return enh.Out
}
if len(vals[0].(Matrix)[0].Floats) == 0 {
// The passed values only contain histograms.
return aggrHistOverTime(vals, enh, func(s Series) *histogram.FloatHistogram {
count := 1
mean := s.Histograms[0].H.Copy()
for _, h := range s.Histograms[1:] {
count++
left := h.H.Copy().Div(float64(count))
right := mean.Copy().Div(float64(count))
// The histogram being added/subtracted must have
// an equal or larger schema.
if h.H.Schema >= mean.Schema {
toAdd := right.Mul(-1).Add(left)
mean.Add(toAdd)
} else {
toAdd := left.Sub(right)
mean = toAdd.Add(mean)
}
}
return mean
})
}
return aggrOverTime(vals, enh, func(s Series) float64 {
var mean, count, c float64
for _, f := range s.Floats {
@ -558,13 +583,26 @@ func funcMinOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
// === sum_over_time(Matrix parser.ValueTypeMatrix) Vector ===
func funcSumOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
if len(vals[0].(Matrix)[0].Floats) == 0 {
// TODO(beorn7): The passed values only contain
// histograms. sum_over_time ignores histograms for now. If
// there are only histograms, we have to return without adding
// anything to enh.Out.
if len(vals[0].(Matrix)[0].Floats) > 0 && len(vals[0].(Matrix)[0].Histograms) > 0 {
// TODO(zenador): Add warning for mixed floats and histograms.
return enh.Out
}
if len(vals[0].(Matrix)[0].Floats) == 0 {
// The passed values only contain histograms.
return aggrHistOverTime(vals, enh, func(s Series) *histogram.FloatHistogram {
sum := s.Histograms[0].H.Copy()
for _, h := range s.Histograms[1:] {
// The histogram being added must have
// an equal or larger schema.
if h.H.Schema >= sum.Schema {
sum.Add(h.H)
} else {
sum = h.H.Copy().Add(sum)
}
}
return sum
})
}
return aggrOverTime(vals, enh, func(s Series) float64 {
var sum, c float64
for _, f := range s.Floats {