Pushdown aggregator group by through read hint (#6401)

* Pushdown aggregator group by through read hint

Implement https://github.com/prometheus/prometheus/issues/6400

* add temporal aggregation pushdown support

Signed-off-by: xiancli <xiancli@ebay.com>
This commit is contained in:
Garrett 2019-12-05 22:06:28 +08:00 committed by Brian Brazil
parent 0ae4899c47
commit 5a9c4acfbf
6 changed files with 269 additions and 39 deletions

View file

@ -364,6 +364,9 @@ type ReadHints struct {
Func string `protobuf:"bytes,2,opt,name=func,proto3" json:"func,omitempty"` Func string `protobuf:"bytes,2,opt,name=func,proto3" json:"func,omitempty"`
StartMs int64 `protobuf:"varint,3,opt,name=start_ms,json=startMs,proto3" json:"start_ms,omitempty"` StartMs int64 `protobuf:"varint,3,opt,name=start_ms,json=startMs,proto3" json:"start_ms,omitempty"`
EndMs int64 `protobuf:"varint,4,opt,name=end_ms,json=endMs,proto3" json:"end_ms,omitempty"` EndMs int64 `protobuf:"varint,4,opt,name=end_ms,json=endMs,proto3" json:"end_ms,omitempty"`
Grouping []string `protobuf:"bytes,5,rep,name=grouping,proto3" json:"grouping,omitempty"`
By bool `protobuf:"varint,6,opt,name=by,proto3" json:"by,omitempty"`
RangeMs int64 `protobuf:"varint,7,opt,name=range_ms,json=rangeMs,proto3" json:"range_ms,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -430,6 +433,27 @@ func (m *ReadHints) GetEndMs() int64 {
return 0 return 0
} }
func (m *ReadHints) GetGrouping() []string {
if m != nil {
return m.Grouping
}
return nil
}
func (m *ReadHints) GetBy() bool {
if m != nil {
return m.By
}
return false
}
func (m *ReadHints) GetRangeMs() int64 {
if m != nil {
return m.RangeMs
}
return 0
}
// Chunk represents a TSDB chunk. // Chunk represents a TSDB chunk.
// Time range [min, max] is inclusive. // Time range [min, max] is inclusive.
type Chunk struct { type Chunk struct {
@ -577,38 +601,41 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{ var fileDescriptor_d938547f84707355 = []byte{
// 496 bytes of a gzipped FileDescriptorProto // 539 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xcd, 0x6e, 0xd3, 0x4c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x14, 0xed, 0xd8, 0x89, 0xd3, 0xdc, 0xf4, 0xfb, 0xe4, 0x8e, 0x82, 0x08, 0x15, 0x84, 0xc8, 0xab, 0x10, 0xee, 0xda, 0x89, 0x9d, 0x4c, 0x4a, 0x95, 0xae, 0x8a, 0x30, 0x15, 0x04, 0xcb, 0x27, 0x9f,
0xac, 0x1c, 0x35, 0xac, 0x90, 0x58, 0x15, 0x59, 0x42, 0xa2, 0x4e, 0xd5, 0x69, 0x11, 0x88, 0x4d, 0x5c, 0x35, 0x9c, 0x90, 0x38, 0x15, 0x45, 0x42, 0xa2, 0x4e, 0xd5, 0x6d, 0x11, 0x88, 0x4b, 0xb5,
0x35, 0x89, 0x87, 0xc4, 0x90, 0x19, 0xbb, 0x9e, 0x09, 0x6a, 0x1f, 0x84, 0xc7, 0xe0, 0x3d, 0xba, 0x89, 0x17, 0xc7, 0x22, 0x5e, 0xbb, 0xde, 0x0d, 0x6a, 0x1e, 0x84, 0xc7, 0xe0, 0xc0, 0x5b, 0xf4,
0xe4, 0x09, 0x10, 0xca, 0x93, 0xa0, 0xb9, 0xb6, 0xeb, 0x48, 0x65, 0x03, 0xbb, 0xfb, 0x73, 0xce, 0xc8, 0x13, 0x20, 0x94, 0x27, 0x41, 0x3b, 0x76, 0x7e, 0xa4, 0x72, 0x81, 0xdb, 0xfc, 0x7c, 0xf3,
0x9c, 0x93, 0x93, 0x6b, 0xe8, 0x99, 0xdb, 0x5c, 0xe8, 0x30, 0x2f, 0x32, 0x93, 0x51, 0xc8, 0x8b, 0x7d, 0x9f, 0x77, 0xc6, 0xd0, 0xd3, 0xcb, 0x52, 0xa8, 0xa8, 0xac, 0x0a, 0x5d, 0x50, 0x28, 0xab,
0x4c, 0x0a, 0xb3, 0x12, 0x1b, 0x7d, 0xd4, 0x5f, 0x66, 0xcb, 0x0c, 0xc7, 0x13, 0x5b, 0x95, 0x88, 0x22, 0x17, 0x7a, 0x26, 0x16, 0xea, 0xf8, 0x28, 0x2d, 0xd2, 0x02, 0xcb, 0x27, 0x26, 0xaa, 0x11,
0xe0, 0x15, 0x78, 0x17, 0x5c, 0xe6, 0x6b, 0x41, 0xfb, 0xd0, 0xfe, 0xca, 0xd7, 0x1b, 0x31, 0x20, 0xc1, 0x6b, 0x70, 0xae, 0x78, 0x5e, 0xce, 0x05, 0x3d, 0x82, 0xf6, 0x57, 0x3e, 0x5f, 0x08, 0x8f,
0x23, 0x32, 0x26, 0xac, 0x6c, 0xe8, 0x53, 0xe8, 0x9a, 0x54, 0x0a, 0x6d, 0xb8, 0xcc, 0x07, 0xce, 0xf8, 0x24, 0x24, 0xac, 0x4e, 0xe8, 0x33, 0xe8, 0xea, 0x2c, 0x17, 0x4a, 0xf3, 0xbc, 0xf4, 0x2c,
0x88, 0x8c, 0x5d, 0xd6, 0x0c, 0x82, 0x6b, 0x80, 0xcb, 0x54, 0x8a, 0x0b, 0x51, 0xa4, 0x42, 0xd3, 0x9f, 0x84, 0x36, 0xdb, 0x16, 0x82, 0x5b, 0x80, 0xeb, 0x2c, 0x17, 0x57, 0xa2, 0xca, 0x84, 0xa2,
0x09, 0x78, 0x6b, 0x3e, 0x17, 0x6b, 0x3d, 0x20, 0x23, 0x77, 0xdc, 0x9b, 0x1e, 0x86, 0x8d, 0x7c, 0x27, 0xe0, 0xcc, 0xf9, 0x44, 0xcc, 0x95, 0x47, 0x7c, 0x3b, 0xec, 0x0d, 0x0f, 0xa3, 0xad, 0x7c,
0x78, 0x6a, 0x37, 0x27, 0xad, 0xbb, 0x9f, 0xcf, 0xf7, 0x58, 0x05, 0xa3, 0x53, 0xe8, 0x68, 0x14, 0x74, 0x6e, 0x3a, 0x67, 0xad, 0xfb, 0x5f, 0x2f, 0xf6, 0x58, 0x03, 0xa3, 0x43, 0x70, 0x15, 0x8a,
0xd7, 0x03, 0x07, 0x19, 0x74, 0x97, 0x51, 0xfa, 0xaa, 0x28, 0x35, 0x30, 0x38, 0x86, 0x36, 0x3e, 0x2b, 0xcf, 0xc2, 0x09, 0xba, 0x3b, 0x51, 0xfb, 0x6a, 0x46, 0xd6, 0xc0, 0xe0, 0x14, 0xda, 0x48,
0x45, 0x29, 0xb4, 0x14, 0x97, 0xa5, 0xdd, 0x2e, 0xc3, 0xba, 0xf9, 0x0d, 0x0e, 0x0e, 0xcb, 0x26, 0x45, 0x29, 0xb4, 0x24, 0xcf, 0x6b, 0xbb, 0x5d, 0x86, 0xf1, 0xf6, 0x1b, 0x2c, 0x2c, 0xd6, 0x49,
0x78, 0x09, 0xde, 0x69, 0x29, 0xf8, 0xb7, 0x0e, 0x83, 0x6f, 0x04, 0x0e, 0x70, 0x1e, 0x73, 0xb3, 0xf0, 0x0a, 0x9c, 0xf3, 0x5a, 0xf0, 0x5f, 0x1d, 0x06, 0xdf, 0x08, 0xec, 0x63, 0x3d, 0xe6, 0x7a,
0x58, 0x89, 0x82, 0x1e, 0x43, 0xcb, 0x06, 0x8c, 0xaa, 0xff, 0x4f, 0x9f, 0x3d, 0xe0, 0x57, 0xb8, 0x3a, 0x13, 0x15, 0x3d, 0x85, 0x96, 0x79, 0x60, 0x54, 0x3d, 0x18, 0x3e, 0x7f, 0x30, 0xdf, 0xe0,
0xf0, 0xf2, 0x36, 0x17, 0x0c, 0xa1, 0xf7, 0x46, 0x9d, 0x3f, 0x19, 0x75, 0x77, 0x8d, 0x8e, 0xa1, 0xa2, 0xeb, 0x65, 0x29, 0x18, 0x42, 0x37, 0x46, 0xad, 0xbf, 0x19, 0xb5, 0x77, 0x8d, 0x86, 0xd0,
0x65, 0x79, 0xd4, 0x03, 0x27, 0x3a, 0xf7, 0xf7, 0x68, 0x07, 0xdc, 0x59, 0x74, 0xee, 0x13, 0x3b, 0x32, 0x73, 0xd4, 0x01, 0x6b, 0x74, 0xd9, 0xdf, 0xa3, 0x2e, 0xd8, 0xe3, 0xd1, 0x65, 0x9f, 0x98,
0x60, 0x91, 0xef, 0xe0, 0x80, 0x45, 0xbe, 0x1b, 0x7c, 0x86, 0x2e, 0x13, 0x3c, 0x79, 0x93, 0x2a, 0x02, 0x1b, 0xf5, 0x2d, 0x2c, 0xb0, 0x51, 0xdf, 0x0e, 0x7e, 0x10, 0xe8, 0x32, 0xc1, 0x93, 0xb7,
0xa3, 0xe9, 0x63, 0xe8, 0x68, 0x23, 0xf2, 0x2b, 0xa9, 0xd1, 0x96, 0xcb, 0x3c, 0xdb, 0xc6, 0xda, 0x99, 0xd4, 0x8a, 0x3e, 0x01, 0x57, 0x69, 0x51, 0xde, 0xe4, 0x0a, 0x7d, 0xd9, 0xcc, 0x31, 0x69,
0x2a, 0x7f, 0xda, 0xa8, 0x45, 0xad, 0x6c, 0x6b, 0xfa, 0x04, 0xf6, 0xb5, 0xe1, 0x85, 0xb1, 0x68, 0xac, 0x8c, 0xf4, 0xe7, 0x85, 0x9c, 0xae, 0xa5, 0x4d, 0x4c, 0x9f, 0x42, 0x47, 0x69, 0x5e, 0x69,
0x17, 0xd1, 0x1d, 0xec, 0x63, 0x4d, 0x1f, 0x81, 0x27, 0x54, 0x62, 0x17, 0x2d, 0x5c, 0xb4, 0x85, 0x83, 0xb6, 0x11, 0xed, 0x62, 0x1e, 0x2b, 0xfa, 0x18, 0x1c, 0x21, 0x13, 0xd3, 0x68, 0x61, 0xa3,
0x4a, 0x62, 0x1d, 0x7c, 0x27, 0xd0, 0x7e, 0xbd, 0xda, 0xa8, 0x2f, 0x74, 0x08, 0x3d, 0x99, 0xaa, 0x2d, 0x64, 0x12, 0x2b, 0x7a, 0x0c, 0x9d, 0xb4, 0x2a, 0x16, 0x65, 0x26, 0x53, 0xaf, 0xed, 0xdb,
0x2b, 0xfb, 0xff, 0x37, 0x62, 0x5d, 0x99, 0x2a, 0x7b, 0x04, 0xb1, 0xc6, 0x3d, 0xbf, 0xb9, 0xdf, 0x61, 0x97, 0x6d, 0x72, 0x7a, 0x00, 0xd6, 0x64, 0xe9, 0x39, 0x3e, 0x09, 0x3b, 0xcc, 0x9a, 0x2c,
0x57, 0xe7, 0x22, 0xf9, 0x4d, 0xb5, 0x0f, 0xab, 0xf0, 0x5c, 0x0c, 0xef, 0x68, 0x37, 0x3c, 0x14, 0x0d, 0x7b, 0xc5, 0x65, 0x2a, 0x0c, 0x89, 0x5b, 0xb3, 0x63, 0x1e, 0xab, 0xe0, 0x3b, 0x81, 0xf6,
0x08, 0x23, 0xb5, 0xc8, 0x92, 0x54, 0x2d, 0x9b, 0xe4, 0x12, 0x6e, 0x38, 0xda, 0x39, 0x60, 0x58, 0x9b, 0xd9, 0x42, 0x7e, 0xa1, 0x03, 0xe8, 0xe5, 0x99, 0xbc, 0x31, 0x77, 0xb4, 0xf5, 0xdc, 0xcd,
0x07, 0x23, 0xd8, 0xaf, 0x51, 0xb4, 0x07, 0x9d, 0x77, 0xb3, 0xb7, 0xb3, 0xb3, 0xf7, 0xb3, 0x32, 0x33, 0x69, 0x8e, 0x29, 0x56, 0xd8, 0xe7, 0x77, 0x9b, 0x7e, 0x73, 0x76, 0x39, 0xbf, 0x6b, 0xfa,
0xac, 0x0f, 0x67, 0xcc, 0x27, 0xc1, 0x35, 0xfc, 0x87, 0xaf, 0x89, 0xe4, 0x5f, 0xef, 0x72, 0x02, 0x51, 0xb3, 0x04, 0x1b, 0x97, 0x70, 0xbc, 0xbb, 0x04, 0x14, 0x88, 0x46, 0x72, 0x5a, 0x24, 0x99,
0xde, 0xc2, 0xbe, 0x50, 0x9f, 0xe5, 0xe1, 0x03, 0xa7, 0x35, 0xa1, 0x84, 0x9d, 0xf4, 0xef, 0xb6, 0x4c, 0xb7, 0x1b, 0x48, 0xb8, 0xe6, 0xf8, 0x55, 0xfb, 0x0c, 0xe3, 0xc0, 0x87, 0xce, 0x1a, 0x45,
0x43, 0xf2, 0x63, 0x3b, 0x24, 0xbf, 0xb6, 0x43, 0xf2, 0xd1, 0xb3, 0xe8, 0x7c, 0x3e, 0xf7, 0xf0, 0x7b, 0xe0, 0xbe, 0x1f, 0xbf, 0x1b, 0x5f, 0x7c, 0x18, 0xd7, 0x8f, 0xfe, 0xf1, 0x82, 0xf5, 0x49,
0x13, 0x7b, 0xf1, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x1e, 0x9b, 0x28, 0x1b, 0x93, 0x03, 0x00, 0x00, 0x70, 0x0b, 0x8f, 0x90, 0x4d, 0x24, 0xff, 0x7b, 0xdf, 0x27, 0xe0, 0x4c, 0x0d, 0xc3, 0xfa, 0xbc,
0x0f, 0x1f, 0x38, 0x5d, 0x0f, 0xd4, 0xb0, 0xb3, 0xa3, 0xfb, 0xd5, 0x80, 0xfc, 0x5c, 0x0d, 0xc8,
0xef, 0xd5, 0x80, 0x7c, 0x72, 0x0c, 0xba, 0x9c, 0x4c, 0x1c, 0xfc, 0x55, 0x5f, 0xfe, 0x09, 0x00,
0x00, 0xff, 0xff, 0xed, 0x99, 0x84, 0x88, 0xdb, 0x03, 0x00, 0x00,
} }
func (m *Sample) Marshal() (dAtA []byte, err error) { func (m *Sample) Marshal() (dAtA []byte, err error) {
@ -856,6 +883,30 @@ func (m *ReadHints) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized) i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized)
} }
if m.RangeMs != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.RangeMs))
i--
dAtA[i] = 0x38
}
if m.By {
i--
if m.By {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x30
}
if len(m.Grouping) > 0 {
for iNdEx := len(m.Grouping) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Grouping[iNdEx])
copy(dAtA[i:], m.Grouping[iNdEx])
i = encodeVarintTypes(dAtA, i, uint64(len(m.Grouping[iNdEx])))
i--
dAtA[i] = 0x2a
}
}
if m.EndMs != 0 { if m.EndMs != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.EndMs)) i = encodeVarintTypes(dAtA, i, uint64(m.EndMs))
i-- i--
@ -1118,6 +1169,18 @@ func (m *ReadHints) Size() (n int) {
if m.EndMs != 0 { if m.EndMs != 0 {
n += 1 + sovTypes(uint64(m.EndMs)) n += 1 + sovTypes(uint64(m.EndMs))
} }
if len(m.Grouping) > 0 {
for _, s := range m.Grouping {
l = len(s)
n += 1 + l + sovTypes(uint64(l))
}
}
if m.By {
n += 2
}
if m.RangeMs != 0 {
n += 1 + sovTypes(uint64(m.RangeMs))
}
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -1846,6 +1909,77 @@ func (m *ReadHints) Unmarshal(dAtA []byte) error {
break break
} }
} }
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Grouping", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Grouping = append(m.Grouping, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field By", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.By = bool(v != 0)
case 7:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RangeMs", wireType)
}
m.RangeMs = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.RangeMs |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:]) skippy, err := skipTypes(dAtA[iNdEx:])

View file

@ -56,6 +56,9 @@ message ReadHints {
string func = 2; // String representation of surrounding function or aggregation. string func = 2; // String representation of surrounding function or aggregation.
int64 start_ms = 3; // Start time in milliseconds. int64 start_ms = 3; // Start time in milliseconds.
int64 end_ms = 4; // End time in milliseconds. int64 end_ms = 4; // End time in milliseconds.
repeated string grouping = 5; // List of label names used in aggregation.
bool by = 6; // Indicate whether it is without or by.
int64 range_ms = 7; // Range vector selector range in milliseconds.
} }
// Chunk represents a TSDB chunk. // Chunk represents a TSDB chunk.

View file

@ -584,6 +584,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
case *VectorSelector: case *VectorSelector:
params.Start = params.Start - durationMilliseconds(LookbackDelta) params.Start = params.Start - durationMilliseconds(LookbackDelta)
params.Func = extractFuncFromPath(path) params.Func = extractFuncFromPath(path)
params.By, params.Grouping = extractGroupsFromPath(path)
if n.Offset > 0 { if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset) offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds params.Start = params.Start - offsetMilliseconds
@ -600,6 +601,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
case *MatrixSelector: case *MatrixSelector:
params.Func = extractFuncFromPath(path) params.Func = extractFuncFromPath(path)
params.Range = durationMilliseconds(n.Range)
// For all matrix queries we want to ensure that we have (end-start) + range selected // For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time // this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(n.Range) params.Start = params.Start - durationMilliseconds(n.Range)
@ -641,6 +643,18 @@ func extractFuncFromPath(p []Node) string {
return extractFuncFromPath(p[:len(p)-1]) return extractFuncFromPath(p[:len(p)-1])
} }
// extractGroupsFromPath parses vector outer function and extracts grouping information if by or without was used.
func extractGroupsFromPath(p []Node) (bool, []string) {
if len(p) == 0 {
return false, nil
}
switch n := p[len(p)-1].(type) {
case *AggregateExpr:
return !n.Without, n.Grouping
}
return false, nil
}
func checkForSeriesSetExpansion(ctx context.Context, expr Expr) { func checkForSeriesSetExpansion(ctx context.Context, expr Expr) {
switch e := expr.(type) { switch e := expr.(type) {
case *MatrixSelector: case *MatrixSelector:

View file

@ -213,8 +213,12 @@ func TestQueryError(t *testing.T) {
// paramCheckerQuerier implements storage.Querier which checks the start and end times // paramCheckerQuerier implements storage.Querier which checks the start and end times
// in params. // in params.
type paramCheckerQuerier struct { type paramCheckerQuerier struct {
start int64 start int64
end int64 end int64
grouping []string
by bool
selRange int64
function string
t *testing.T t *testing.T
} }
@ -222,6 +226,10 @@ type paramCheckerQuerier struct {
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
testutil.Equals(q.t, q.start, sp.Start) testutil.Equals(q.t, q.start, sp.Start)
testutil.Equals(q.t, q.end, sp.End) testutil.Equals(q.t, q.end, sp.End)
testutil.Equals(q.t, q.grouping, sp.Grouping)
testutil.Equals(q.t, q.by, sp.By)
testutil.Equals(q.t, q.selRange, sp.Range)
testutil.Equals(q.t, q.function, sp.Func)
return errSeriesSet{err: nil}, nil, nil return errSeriesSet{err: nil}, nil, nil
} }
@ -256,6 +264,11 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart int64 paramStart int64
paramEnd int64 paramEnd int64
paramGrouping []string
paramBy bool
paramRange int64
paramFunc string
}{{ }{{
query: "foo", query: "foo",
start: 10, start: 10,
@ -268,12 +281,14 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 80, // 200 - 120 paramStart: 80, // 200 - 120
paramEnd: 200, paramEnd: 200,
paramRange: 120000,
}, { }, {
query: "foo[2m] offset 2m", query: "foo[2m] offset 2m",
start: 300, start: 300,
paramStart: 60, paramStart: 60,
paramEnd: 180, paramEnd: 180,
paramRange: 120000,
}, { }, {
query: "foo[2m:1s]", query: "foo[2m:1s]",
start: 300, start: 300,
@ -286,18 +301,21 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 175, // 300 - 120 - 5 paramStart: 175, // 300 - 120 - 5
paramEnd: 300, paramEnd: 300,
paramFunc: "count_over_time",
}, { }, {
query: "count_over_time(foo[2m:1s] offset 10s)", query: "count_over_time(foo[2m:1s] offset 10s)",
start: 300, start: 300,
paramStart: 165, // 300 - 120 - 5 - 10 paramStart: 165, // 300 - 120 - 5 - 10
paramEnd: 300, paramEnd: 300,
paramFunc: "count_over_time",
}, { }, {
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)", query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
start: 300, start: 300,
paramStart: 155, // 300 - 120 - 5 - 10 - 10 paramStart: 155, // 300 - 120 - 5 - 10 - 10
paramEnd: 290, paramEnd: 290,
paramFunc: "count_over_time",
}, { }, {
// Range queries now. // Range queries now.
query: "foo", query: "foo",
@ -313,6 +331,8 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 80, // 200 - 120 paramStart: 80, // 200 - 120
paramEnd: 500, paramEnd: 500,
paramRange: 120000,
paramFunc: "rate",
}, { }, {
query: "rate(foo[2m] offset 2m)", query: "rate(foo[2m] offset 2m)",
start: 300, start: 300,
@ -320,6 +340,8 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 60, paramStart: 60,
paramEnd: 380, paramEnd: 380,
paramRange: 120000,
paramFunc: "rate",
}, { }, {
query: "rate(foo[2m:1s])", query: "rate(foo[2m:1s])",
start: 300, start: 300,
@ -327,6 +349,7 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 175, // 300 - 120 - 5 paramStart: 175, // 300 - 120 - 5
paramEnd: 500, paramEnd: 500,
paramFunc: "rate",
}, { }, {
query: "count_over_time(foo[2m:1s])", query: "count_over_time(foo[2m:1s])",
start: 300, start: 300,
@ -334,6 +357,7 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 175, // 300 - 120 - 5 paramStart: 175, // 300 - 120 - 5
paramEnd: 500, paramEnd: 500,
paramFunc: "count_over_time",
}, { }, {
query: "count_over_time(foo[2m:1s] offset 10s)", query: "count_over_time(foo[2m:1s] offset 10s)",
start: 300, start: 300,
@ -341,6 +365,7 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 165, // 300 - 120 - 5 - 10 paramStart: 165, // 300 - 120 - 5 - 10
paramEnd: 500, paramEnd: 500,
paramFunc: "count_over_time",
}, { }, {
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)", query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
start: 300, start: 300,
@ -348,12 +373,59 @@ func TestParamsSetCorrectly(t *testing.T) {
paramStart: 155, // 300 - 120 - 5 - 10 - 10 paramStart: 155, // 300 - 120 - 5 - 10 - 10
paramEnd: 490, paramEnd: 490,
paramFunc: "count_over_time",
}, {
query: "sum by (dim1) (foo)",
start: 10,
paramStart: 5,
paramEnd: 10,
paramGrouping: []string{"dim1"},
paramBy: true,
paramFunc: "sum",
}, {
query: "sum without (dim1) (foo)",
start: 10,
paramStart: 5,
paramEnd: 10,
paramGrouping: []string{"dim1"},
paramBy: false,
paramFunc: "sum",
}, {
query: "sum by (dim1) (avg_over_time(foo[1s]))",
start: 10,
paramStart: 9,
paramEnd: 10,
paramGrouping: nil,
paramBy: false,
paramRange: 1000,
paramFunc: "avg_over_time",
}, {
query: "sum by (dim1) (max by (dim2) (foo))",
start: 10,
paramStart: 5,
paramEnd: 10,
paramGrouping: []string{"dim2"},
paramBy: true,
paramFunc: "max",
}, {
query: "(max by (dim1) (foo))[5s:1s]",
start: 10,
paramStart: 0,
paramEnd: 10,
paramGrouping: []string{"dim1"},
paramBy: true,
paramFunc: "max",
}} }}
for _, tc := range cases { for _, tc := range cases {
engine := NewEngine(opts) engine := NewEngine(opts)
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &paramCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, t: t}, nil return &paramCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil
}) })
var ( var (

View file

@ -71,6 +71,10 @@ type SelectParams struct {
Step int64 // Query step size in milliseconds. Step int64 // Query step size in milliseconds.
Func string // String representation of surrounding function or aggregation. Func string // String representation of surrounding function or aggregation.
Grouping []string // List of label names used in aggregation.
By bool // Indicate whether it is without or by.
Range int64 // Range vector selector range in milliseconds.
} }
// QueryableFunc is an adapter to allow the use of ordinary functions as // QueryableFunc is an adapter to allow the use of ordinary functions as

View file

@ -88,10 +88,13 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams
var rp *prompb.ReadHints var rp *prompb.ReadHints
if p != nil { if p != nil {
rp = &prompb.ReadHints{ rp = &prompb.ReadHints{
StepMs: p.Step, StepMs: p.Step,
Func: p.Func, Func: p.Func,
StartMs: p.Start, StartMs: p.Start,
EndMs: p.End, EndMs: p.End,
Grouping: p.Grouping,
By: p.By,
RangeMs: p.Range,
} }
} }