test additional len and lenbytes formats

Co-authored-by: Nicolás Pazos <npazosmendez@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Callum Styan 2023-11-15 10:04:38 -08:00 committed by Nicolás Pazos
parent 8b0cda8bb3
commit 97d0556584
9 changed files with 1922 additions and 134 deletions

View file

@ -60,7 +60,7 @@ func (x ReadRequest_ResponseType) String() string {
}
func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{6, 0}
return fileDescriptor_eefc82927d57d89b, []int{8, 0}
}
type WriteRequest struct {
@ -289,6 +289,120 @@ func (m *MinimizedWriteRequestBytes) GetSymbols() string {
return ""
}
type MinimizedWriteRequestLen struct {
Timeseries []MinimizedTimeSeriesLen `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
Symbols []byte `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestLen) Reset() { *m = MinimizedWriteRequestLen{} }
func (m *MinimizedWriteRequestLen) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestLen) ProtoMessage() {}
func (*MinimizedWriteRequestLen) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4}
}
func (m *MinimizedWriteRequestLen) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestLen) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestLen.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestLen) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestLen.Merge(m, src)
}
func (m *MinimizedWriteRequestLen) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestLen) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestLen.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestLen proto.InternalMessageInfo
func (m *MinimizedWriteRequestLen) GetTimeseries() []MinimizedTimeSeriesLen {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestLen) GetSymbols() []byte {
if m != nil {
return m.Symbols
}
return nil
}
type MinimizedWriteRequestLenBytes struct {
Timeseries []MinimizedTimeSeriesLenBytes `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
Symbols []byte `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestLenBytes) Reset() { *m = MinimizedWriteRequestLenBytes{} }
func (m *MinimizedWriteRequestLenBytes) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestLenBytes) ProtoMessage() {}
func (*MinimizedWriteRequestLenBytes) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5}
}
func (m *MinimizedWriteRequestLenBytes) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestLenBytes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestLenBytes.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestLenBytes) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestLenBytes.Merge(m, src)
}
func (m *MinimizedWriteRequestLenBytes) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestLenBytes) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestLenBytes.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestLenBytes proto.InternalMessageInfo
func (m *MinimizedWriteRequestLenBytes) GetTimeseries() []MinimizedTimeSeriesLenBytes {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestLenBytes) GetSymbols() []byte {
if m != nil {
return m.Symbols
}
return nil
}
type MinimizedWriteRequestFixed32 struct {
Timeseries []MinimizedTimeSeriesFixed32 `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings. To read the symbols table, it's required
@ -303,7 +417,7 @@ func (m *MinimizedWriteRequestFixed32) Reset() { *m = MinimizedWriteRequ
func (m *MinimizedWriteRequestFixed32) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestFixed32) ProtoMessage() {}
func (*MinimizedWriteRequestFixed32) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4}
return fileDescriptor_eefc82927d57d89b, []int{6}
}
func (m *MinimizedWriteRequestFixed32) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -360,7 +474,7 @@ func (m *MinimizedWriteRequestFixed64) Reset() { *m = MinimizedWriteRequ
func (m *MinimizedWriteRequestFixed64) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestFixed64) ProtoMessage() {}
func (*MinimizedWriteRequestFixed64) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5}
return fileDescriptor_eefc82927d57d89b, []int{7}
}
func (m *MinimizedWriteRequestFixed64) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -421,7 +535,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{6}
return fileDescriptor_eefc82927d57d89b, []int{8}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -477,7 +591,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{7}
return fileDescriptor_eefc82927d57d89b, []int{9}
}
func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -527,7 +641,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{8}
return fileDescriptor_eefc82927d57d89b, []int{10}
}
func (m *Query) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -596,7 +710,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{9}
return fileDescriptor_eefc82927d57d89b, []int{11}
}
func (m *QueryResult) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -649,7 +763,7 @@ func (m *ChunkedReadResponse) Reset() { *m = ChunkedReadResponse{} }
func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) }
func (*ChunkedReadResponse) ProtoMessage() {}
func (*ChunkedReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{10}
return fileDescriptor_eefc82927d57d89b, []int{12}
}
func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -698,6 +812,8 @@ func init() {
proto.RegisterType((*MinimizedWriteRequestPacking)(nil), "prometheus.MinimizedWriteRequestPacking")
proto.RegisterType((*MinimizedWriteRequest)(nil), "prometheus.MinimizedWriteRequest")
proto.RegisterType((*MinimizedWriteRequestBytes)(nil), "prometheus.MinimizedWriteRequestBytes")
proto.RegisterType((*MinimizedWriteRequestLen)(nil), "prometheus.MinimizedWriteRequestLen")
proto.RegisterType((*MinimizedWriteRequestLenBytes)(nil), "prometheus.MinimizedWriteRequestLenBytes")
proto.RegisterType((*MinimizedWriteRequestFixed32)(nil), "prometheus.MinimizedWriteRequestFixed32")
proto.RegisterType((*MinimizedWriteRequestFixed64)(nil), "prometheus.MinimizedWriteRequestFixed64")
proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest")
@ -710,46 +826,49 @@ func init() {
func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) }
var fileDescriptor_eefc82927d57d89b = []byte{
// 621 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x95, 0xdf, 0x4e, 0x13, 0x41,
0x14, 0xc6, 0x19, 0x5a, 0x68, 0x3d, 0x8b, 0xa4, 0x0e, 0x20, 0xb5, 0x31, 0x40, 0x36, 0xc6, 0x34,
0xc1, 0xd4, 0x08, 0x0d, 0x57, 0x5c, 0x08, 0x58, 0x83, 0xc8, 0x2a, 0x4e, 0x6b, 0x34, 0xc6, 0x64,
0xb3, 0xdd, 0x3d, 0xa1, 0x13, 0xd8, 0x3f, 0xec, 0x4c, 0x13, 0xd6, 0x6b, 0xaf, 0x8d, 0xf1, 0x91,
0xbc, 0xe2, 0xca, 0xf8, 0x04, 0xc6, 0xf0, 0x24, 0x66, 0xff, 0x95, 0xa9, 0xae, 0xc1, 0x34, 0xf1,
0x6e, 0xf7, 0x9c, 0xef, 0xfb, 0xf6, 0x37, 0x73, 0xa6, 0x53, 0x98, 0x0b, 0xd1, 0xf5, 0x25, 0xb6,
0x82, 0xd0, 0x97, 0x3e, 0x85, 0x20, 0xf4, 0x5d, 0x94, 0x03, 0x1c, 0x8a, 0x86, 0x26, 0xa3, 0x00,
0x45, 0xda, 0x68, 0x2c, 0x1e, 0xfb, 0xc7, 0x7e, 0xf2, 0xf8, 0x30, 0x7e, 0x4a, 0xab, 0xfa, 0x67,
0x02, 0x73, 0x6f, 0x42, 0x2e, 0x91, 0xe1, 0xd9, 0x10, 0x85, 0xa4, 0xdb, 0x00, 0x92, 0xbb, 0x28,
0x30, 0xe4, 0x28, 0xea, 0x64, 0xad, 0xd4, 0xd4, 0x36, 0x6e, 0xb7, 0xae, 0x42, 0x5b, 0x3d, 0xee,
0x62, 0x37, 0xe9, 0xee, 0x96, 0x2f, 0x7e, 0xac, 0x4e, 0x31, 0x45, 0x4f, 0xb7, 0xa1, 0xea, 0xa2,
0xb4, 0x1c, 0x4b, 0x5a, 0xf5, 0x52, 0xe2, 0x6d, 0xa8, 0x5e, 0x03, 0x65, 0xc8, 0x6d, 0x23, 0x53,
0x64, 0xfe, 0x91, 0xe3, 0xa0, 0x5c, 0x9d, 0xae, 0x95, 0xf4, 0x2f, 0x04, 0xee, 0x1a, 0xdc, 0xe3,
0x2e, 0xff, 0x80, 0x8e, 0xca, 0x76, 0x64, 0xd9, 0x27, 0xdc, 0x3b, 0xa6, 0x87, 0x05, 0x88, 0xf7,
0xc7, 0x3e, 0x93, 0xbb, 0xaf, 0x58, 0x33, 0x6f, 0x01, 0x72, 0x1d, 0x2a, 0x22, 0x72, 0xfb, 0xfe,
0xa9, 0xa8, 0x97, 0xd7, 0x48, 0xf3, 0x06, 0xcb, 0x5f, 0x53, 0x9c, 0x83, 0x72, 0xb5, 0x54, 0x2b,
0xeb, 0x1f, 0x09, 0x2c, 0x15, 0x42, 0xd1, 0x4e, 0x01, 0xcd, 0xea, 0x35, 0x34, 0x13, 0x63, 0x7c,
0x22, 0xd0, 0x28, 0xc4, 0xd8, 0x8d, 0x24, 0x0a, 0x7a, 0x50, 0xc0, 0x72, 0xef, 0x3a, 0x96, 0xd8,
0x39, 0x31, 0xd0, 0x5f, 0x87, 0xf5, 0x94, 0x9f, 0xa3, 0xb3, 0xb9, 0x31, 0xd1, 0xb0, 0x32, 0xef,
0x7f, 0x82, 0xda, 0x6a, 0x4f, 0x0e, 0xb5, 0xd5, 0x9e, 0x18, 0xea, 0x1b, 0x01, 0x8d, 0xa1, 0xe5,
0xe4, 0xe7, 0x66, 0x1d, 0x2a, 0x67, 0x43, 0x15, 0xe0, 0x96, 0x0a, 0xf0, 0x6a, 0x88, 0x61, 0xc4,
0x72, 0x05, 0x7d, 0x0f, 0xcb, 0x96, 0x6d, 0x63, 0x20, 0xd1, 0x31, 0x43, 0x14, 0x81, 0xef, 0x09,
0x34, 0x93, 0x5f, 0x77, 0x7d, 0x7a, 0xad, 0xd4, 0x9c, 0x1f, 0x9f, 0xb2, 0xf2, 0x99, 0x16, 0xcb,
0xd4, 0xbd, 0x28, 0x40, 0xb6, 0x94, 0x87, 0xa8, 0x55, 0xa1, 0xb7, 0x61, 0x4e, 0x2d, 0x50, 0x0d,
0x2a, 0xdd, 0x1d, 0xe3, 0xe8, 0xb0, 0xd3, 0xad, 0x4d, 0xd1, 0x65, 0x58, 0xe8, 0xf6, 0x58, 0x67,
0xc7, 0xe8, 0x3c, 0x31, 0xdf, 0xbe, 0x64, 0xe6, 0xde, 0xfe, 0xeb, 0x17, 0xcf, 0xbb, 0x35, 0xa2,
0xef, 0xc4, 0x2e, 0x6b, 0x14, 0x45, 0x1f, 0x41, 0x25, 0x44, 0x31, 0x3c, 0x95, 0xf9, 0x82, 0x96,
0xff, 0x5c, 0x50, 0xd2, 0x67, 0xb9, 0x4e, 0xff, 0x4a, 0x60, 0x26, 0x69, 0xd0, 0x07, 0x40, 0x85,
0xb4, 0x42, 0x69, 0x26, 0xfb, 0x2a, 0x2d, 0x37, 0x30, 0xdd, 0x38, 0x87, 0x34, 0x4b, 0xac, 0x96,
0x74, 0x7a, 0x79, 0xc3, 0x10, 0xb4, 0x09, 0x35, 0xf4, 0x9c, 0x71, 0xed, 0x74, 0xa2, 0x9d, 0x47,
0xcf, 0x51, 0x95, 0x6d, 0xa8, 0xba, 0x96, 0xb4, 0x07, 0x18, 0x8a, 0xec, 0x42, 0xaa, 0xab, 0x54,
0x87, 0x56, 0x1f, 0x4f, 0x8d, 0x54, 0xc0, 0x46, 0x4a, 0xba, 0x0e, 0x33, 0x03, 0xee, 0xc9, 0x74,
0x9e, 0xda, 0xc6, 0xd2, 0xef, 0x9b, 0xbb, 0x1f, 0x37, 0x59, 0xaa, 0xd1, 0x3b, 0xa0, 0x29, 0x8b,
0xa3, 0x5b, 0xff, 0x7e, 0x81, 0xaa, 0xa7, 0x48, 0x3f, 0x87, 0x85, 0xbd, 0xc1, 0xd0, 0x3b, 0x89,
0x87, 0xa3, 0xec, 0xea, 0x63, 0x98, 0xb7, 0xd3, 0xb2, 0x39, 0x16, 0x79, 0x47, 0x8d, 0xcc, 0x8c,
0x59, 0xea, 0x4d, 0x5b, 0x7d, 0xa5, 0xab, 0xa0, 0xc5, 0xc7, 0x28, 0x32, 0xb9, 0xe7, 0xe0, 0x79,
0xb6, 0x4f, 0x90, 0x94, 0x9e, 0xc5, 0x95, 0xdd, 0xc5, 0x8b, 0xcb, 0x15, 0xf2, 0xfd, 0x72, 0x85,
0xfc, 0xbc, 0x5c, 0x21, 0xef, 0x66, 0xe3, 0xdc, 0xa0, 0xdf, 0x9f, 0x4d, 0xfe, 0x20, 0x36, 0x7f,
0x05, 0x00, 0x00, 0xff, 0xff, 0xf7, 0x0a, 0xf4, 0xd7, 0x5f, 0x06, 0x00, 0x00,
// 669 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x95, 0x5f, 0x4f, 0x13, 0x4d,
0x14, 0xc6, 0x19, 0x5a, 0x68, 0xdf, 0xb3, 0x7d, 0x49, 0xdf, 0x01, 0x5e, 0xd6, 0x46, 0x81, 0x6c,
0x8c, 0x36, 0xc1, 0xd4, 0x08, 0x0d, 0x57, 0x5c, 0x08, 0x58, 0x83, 0xd8, 0x55, 0x9c, 0xd6, 0x68,
0x8c, 0xc9, 0x66, 0xbb, 0x7b, 0x42, 0x37, 0xb0, 0x7f, 0xd8, 0x99, 0x26, 0xd4, 0x6b, 0xaf, 0xbc,
0x30, 0xc6, 0xf8, 0x89, 0xbc, 0xe2, 0xca, 0xf8, 0x09, 0x8c, 0xe1, 0x93, 0x98, 0xfd, 0x07, 0x53,
0x5d, 0x2c, 0xe9, 0xdd, 0xee, 0x39, 0xcf, 0xf3, 0xec, 0x6f, 0xce, 0xcc, 0xee, 0x42, 0x25, 0x44,
0xd7, 0x17, 0xd8, 0x08, 0x42, 0x5f, 0xf8, 0x14, 0x82, 0xd0, 0x77, 0x51, 0xf4, 0x71, 0xc0, 0x6b,
0x8a, 0x18, 0x06, 0xc8, 0x93, 0x46, 0x6d, 0xe1, 0xd0, 0x3f, 0xf4, 0xe3, 0xcb, 0xfb, 0xd1, 0x55,
0x52, 0xd5, 0x3e, 0x11, 0xa8, 0xbc, 0x0a, 0x1d, 0x81, 0x0c, 0x4f, 0x06, 0xc8, 0x05, 0xdd, 0x02,
0x10, 0x8e, 0x8b, 0x1c, 0x43, 0x07, 0xb9, 0x4a, 0x56, 0x0b, 0x75, 0x65, 0xfd, 0xff, 0xc6, 0x65,
0x68, 0xa3, 0xeb, 0xb8, 0xd8, 0x89, 0xbb, 0x3b, 0xc5, 0xb3, 0x1f, 0x2b, 0x53, 0x4c, 0xd2, 0xd3,
0x2d, 0x28, 0xbb, 0x28, 0x4c, 0xdb, 0x14, 0xa6, 0x5a, 0x88, 0xbd, 0x35, 0xd9, 0xab, 0xa3, 0x08,
0x1d, 0x4b, 0x4f, 0x15, 0xa9, 0xff, 0xc2, 0xb1, 0x5f, 0x2c, 0x4f, 0x57, 0x0b, 0xda, 0x67, 0x02,
0x37, 0x75, 0xc7, 0x73, 0x5c, 0xe7, 0x1d, 0xda, 0x32, 0xdb, 0x81, 0x69, 0x1d, 0x39, 0xde, 0x21,
0x6d, 0xe7, 0x20, 0xde, 0x19, 0x79, 0x4c, 0xe6, 0xbe, 0x64, 0x4d, 0xbd, 0x39, 0xc8, 0x2a, 0x94,
0xf8, 0xd0, 0xed, 0xf9, 0xc7, 0x5c, 0x2d, 0xae, 0x92, 0xfa, 0x3f, 0x2c, 0xbb, 0x4d, 0x70, 0xf6,
0x8b, 0xe5, 0x42, 0xb5, 0xa8, 0xbd, 0x27, 0xb0, 0x98, 0x0b, 0x45, 0x5b, 0x39, 0x34, 0x2b, 0x63,
0x68, 0x26, 0xc6, 0xf8, 0x48, 0xa0, 0x96, 0x8b, 0xb1, 0x33, 0x14, 0xc8, 0xe9, 0x7e, 0x0e, 0xcb,
0xed, 0x71, 0x2c, 0x91, 0x73, 0x62, 0xa0, 0x0f, 0x04, 0xd4, 0x5c, 0xa0, 0x36, 0x7a, 0x74, 0x2f,
0x07, 0x47, 0x1b, 0x83, 0xd3, 0x46, 0x6f, 0x3c, 0x4c, 0x25, 0x1f, 0xe6, 0x0b, 0x81, 0x5b, 0x57,
0xc1, 0x24, 0x03, 0xd2, 0x73, 0x88, 0xee, 0x5e, 0x83, 0xe8, 0x7a, 0x33, 0xba, 0x02, 0xeb, 0xca,
0x03, 0xfd, 0xd8, 0x39, 0x45, 0x7b, 0x63, 0x7d, 0xa2, 0x03, 0x9d, 0x7a, 0x27, 0xde, 0xb8, 0xbf,
0x43, 0x6d, 0x36, 0x27, 0x87, 0xda, 0x6c, 0x4e, 0x0c, 0xf5, 0x8d, 0x80, 0xc2, 0xd0, 0xb4, 0xb3,
0x77, 0x6b, 0x0d, 0x4a, 0x27, 0x03, 0x19, 0xe0, 0x3f, 0x19, 0xe0, 0xc5, 0x00, 0xc3, 0x21, 0xcb,
0x14, 0xf4, 0x2d, 0x2c, 0x99, 0x96, 0x85, 0x81, 0x40, 0xdb, 0x08, 0x91, 0x07, 0xbe, 0xc7, 0xd1,
0x88, 0xbf, 0x80, 0xea, 0xf4, 0x6a, 0xa1, 0x3e, 0x37, 0xfa, 0x26, 0x48, 0x8f, 0x69, 0xb0, 0x54,
0xdd, 0x1d, 0x06, 0xc8, 0x16, 0xb3, 0x10, 0xb9, 0xca, 0xb5, 0x26, 0x54, 0xe4, 0x02, 0x55, 0xa0,
0xd4, 0xd9, 0xd6, 0x0f, 0xda, 0xad, 0x4e, 0x75, 0x8a, 0x2e, 0xc1, 0x7c, 0xa7, 0xcb, 0x5a, 0xdb,
0x7a, 0xeb, 0x91, 0xf1, 0xfa, 0x39, 0x33, 0x76, 0xf7, 0x5e, 0x3e, 0x7b, 0xda, 0xa9, 0x12, 0x6d,
0x3b, 0x72, 0x99, 0x17, 0x51, 0xf4, 0x01, 0x94, 0x42, 0xe4, 0x83, 0x63, 0x91, 0x2d, 0x68, 0xe9,
0xcf, 0x05, 0xc5, 0x7d, 0x96, 0xe9, 0xb4, 0xaf, 0x04, 0x66, 0xe2, 0x06, 0xbd, 0x07, 0x94, 0x0b,
0x33, 0x14, 0x46, 0x3c, 0x57, 0x61, 0xba, 0x81, 0xe1, 0x46, 0x39, 0xa4, 0x5e, 0x60, 0xd5, 0xb8,
0xd3, 0xcd, 0x1a, 0x3a, 0xa7, 0x75, 0xa8, 0xa2, 0x67, 0x8f, 0x6a, 0xa7, 0x63, 0xed, 0x1c, 0x7a,
0xb6, 0xac, 0x6c, 0x42, 0xd9, 0x35, 0x85, 0xd5, 0xc7, 0x90, 0xa7, 0x1f, 0x6d, 0x55, 0xa6, 0x6a,
0x9b, 0x3d, 0x3c, 0xd6, 0x13, 0x01, 0xbb, 0x50, 0xd2, 0x35, 0x98, 0xe9, 0x3b, 0x9e, 0x48, 0xf6,
0x53, 0x59, 0x5f, 0xfc, 0x7d, 0xb8, 0x7b, 0x51, 0x93, 0x25, 0x1a, 0xad, 0x05, 0x8a, 0xb4, 0x38,
0xba, 0x79, 0xfd, 0x9f, 0x8c, 0x7c, 0x8a, 0xb4, 0x53, 0x98, 0xdf, 0xed, 0x0f, 0xbc, 0xa3, 0x68,
0x73, 0xa4, 0xa9, 0x3e, 0x84, 0x39, 0x2b, 0x29, 0x1b, 0x23, 0x91, 0x37, 0xe4, 0xc8, 0xd4, 0x98,
0xa6, 0xfe, 0x6b, 0xc9, 0xb7, 0x74, 0x05, 0x94, 0xe8, 0x18, 0x0d, 0x0d, 0xc7, 0xb3, 0xf1, 0x34,
0x9d, 0x13, 0xc4, 0xa5, 0x27, 0x51, 0x65, 0x67, 0xe1, 0xec, 0x7c, 0x99, 0x7c, 0x3f, 0x5f, 0x26,
0x3f, 0xcf, 0x97, 0xc9, 0x9b, 0xd9, 0x28, 0x37, 0xe8, 0xf5, 0x66, 0xe3, 0x9f, 0xe8, 0xc6, 0xaf,
0x00, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x56, 0x04, 0x75, 0x83, 0x07, 0x00, 0x00,
}
func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
@ -951,6 +1070,102 @@ func (m *MinimizedWriteRequestBytes) MarshalToSizedBuffer(dAtA []byte) (int, err
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestLen) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestLen) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestLen) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestLenBytes) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestLenBytes) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestLenBytes) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestFixed32) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -1398,6 +1613,50 @@ func (m *MinimizedWriteRequestBytes) Size() (n int) {
return n
}
func (m *MinimizedWriteRequestLen) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
l = len(m.Symbols)
if l > 0 {
n += 1 + l + sovRemote(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *MinimizedWriteRequestLenBytes) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
l = len(m.Symbols)
if l > 0 {
n += 1 + l + sovRemote(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *MinimizedWriteRequestFixed32) Size() (n int) {
if m == nil {
return 0
@ -2028,6 +2287,244 @@ func (m *MinimizedWriteRequestBytes) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *MinimizedWriteRequestLen) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestLen: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestLen: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesLen{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols[:0], dAtA[iNdEx:postIndex]...)
if m.Symbols == nil {
m.Symbols = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequestLenBytes) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestLenBytes: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestLenBytes: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesLenBytes{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols[:0], dAtA[iNdEx:postIndex]...)
if m.Symbols == nil {
m.Symbols = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequestFixed32) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

View file

@ -63,6 +63,30 @@ message MinimizedWriteRequestBytes {
string symbols = 4;
}
message MinimizedWriteRequestLen {
repeated MinimizedTimeSeriesLen timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
// Metadata (3) has moved to be part of the TimeSeries type
reserved 3;
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
bytes symbols = 4;
}
message MinimizedWriteRequestLenBytes {
repeated MinimizedTimeSeriesLenBytes timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
// Metadata (3) has moved to be part of the TimeSeries type
reserved 3;
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
bytes symbols = 4;
}
message MinimizedWriteRequestFixed32 {
repeated MinimizedTimeSeriesFixed32 timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.

File diff suppressed because it is too large Load diff

View file

@ -195,6 +195,32 @@ message MinimizedTimeSeriesBytes {
// TODO: add metadata
}
message MinimizedTimeSeriesLen {
// Sorted list of label name-value pair references, encoded as varints. This
// list's real len is always multiple of 2, label name offset/label value offset.
repeated fixed32 label_symbols = 1;
// Sorted by time, oldest sample first.
// TODO: support references for other types
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message MinimizedTimeSeriesLenBytes{
// Sorted list of label name-value pair references, encoded as varints. This
// list's real len is always multiple of 2, label name offset/label value offset.
bytes label_symbols = 1;
// Sorted by time, oldest sample first.
// TODO: support references for other types
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message Label {
string name = 1;
string value = 2;

View file

@ -11,6 +11,10 @@ INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-
INSTANCES+=('sender-v11-min64-fixed;--remote-write-format 2;receiver-v11-min64-fixed;--remote-write-format 2')
INSTANCES+=('sender-v11-min32-fixed;--remote-write-format 3;receiver-v11-min32-fixed;--remote-write-format 3')
INSTANCES+=('sender-v11-min-bytes;--remote-write-format 4;receiver-v11-min-bytes;--remote-write-format 4')
INSTANCES+=('sender-v11-min-len;--remote-write-format 5;receiver-v11-min-len;--remote-write-format 5')
INSTANCES+=('sender-v11-min-len-bytes;--remote-write-format 6;receiver-v11-min-len-bytes;--remote-write-format 6')
# ~~~~~~~~~~~~~

View file

@ -24,6 +24,7 @@ import (
"sort"
"strings"
"sync"
"unsafe"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@ -826,6 +827,44 @@ func labelsToByteSlice(lbls labels.Labels, symbolTable *rwSymbolTable, buf []byt
return result
}
func labelsToUint32SliceLen(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0]
lbls.Range(func(l labels.Label) {
off := symbolTable.RefLen(l.Name)
result = append(result, off)
off = symbolTable.RefLen(l.Value)
result = append(result, off)
})
return result
}
func Uint32LenRefToLabels(symbols []byte, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
labelIdx := 0
for labelIdx < len(minLabels) {
// todo, check for overflow?
offset := minLabels[labelIdx]
labelIdx++
length, n := binary.Uvarint(symbols[offset:])
offset += uint32(n)
name := symbols[offset : uint64(offset)+length]
offset = minLabels[labelIdx]
labelIdx++
length, n = binary.Uvarint(symbols[offset:])
offset += uint32(n)
value := symbols[offset : uint64(offset)+length]
ls.Add(yoloString(name), yoloString(value))
}
return ls.Labels()
}
func yoloString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func Uint32RefToLabels(symbols string, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
@ -855,6 +894,7 @@ func ByteSliceToLabels(symbols string, minLabels []byte) labels.Labels {
for len(minLabels) > 0 {
// todo, check for overflow?
offset, n := binary.Uvarint(minLabels)
//fmt.Println(:"offset")
minLabels = minLabels[n:]
length, n := binary.Uvarint(minLabels)
minLabels = minLabels[n:]
@ -888,6 +928,46 @@ func ByteSliceToLabels(symbols string, minLabels []byte) labels.Labels {
return ls.Labels()
}
func ByteSliceToLabelsSymbolsByte(symbols []byte, minLabels []byte) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
for len(minLabels) > 0 {
// todo, check for overflow?
offset, n := binary.Uvarint(minLabels)
//fmt.Println(:"offset")
minLabels = minLabels[n:]
length, n := binary.Uvarint(minLabels)
minLabels = minLabels[n:]
name := symbols[offset : offset+length]
// todo, check for overflow?
offset, n = binary.Uvarint(minLabels)
minLabels = minLabels[n:]
length, n = binary.Uvarint(minLabels)
minLabels = minLabels[n:]
value := symbols[offset : offset+length]
ls.Add(string(name), string(value))
}
// labelIdx := 0
// for labelIdx < len(minLabels) {
// // todo, check for overflow?
// offset := minLabels[labelIdx]
// labelIdx++
// length := minLabels[labelIdx]
// labelIdx++
// name := symbols[offset : offset+length]
// // todo, check for overflow?
// offset = minLabels[labelIdx]
// labelIdx++
// length = minLabels[labelIdx]
// labelIdx++
// value := symbols[offset : offset+length]
// ls.Add(name, value)
// }
return ls.Labels()
}
func Uint64RefToLabels(symbols string, minLabels []uint64) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
labelIdx := 0
@ -1068,6 +1148,44 @@ func DecodeMinimizedWriteRequestBytes(r io.Reader) (*prompb.MinimizedWriteReques
return &req, nil
}
func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestLen
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func DecodeMinimizedWriteRequestLenBytes(r io.Reader) (*prompb.MinimizedWriteRequestLenBytes, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestLenBytes
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),

View file

@ -932,3 +932,11 @@ func TestPackRef64(t *testing.T) {
require.Equal(t, value, string(rw.symbols[valOffset:valOffset+valLength]))
}
func TestLenFormat(t *testing.T) {
r := rwSymbolTable{}
ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234")
encoded := labelsToUint32SliceLen(ls, &r, nil)
decoded := Uint32LenRefToLabels(r.LabelsData(), encoded)
require.Equal(t, ls, decoded)
}

View file

@ -15,6 +15,7 @@ package remote
import (
"context"
"encoding/binary"
"errors"
"math"
"strconv"
@ -396,7 +397,9 @@ const (
Min32Optimized // two 32bit varint plus marshalling optimization
Min64Fixed // a single fixed64 bit value, first 32 are offset and 2nd 32 are
Min32Fixed
MinBytes // two 32bit fixed, similar to optimized but not varints + no manual marshalling optimization
MinBytes // two 32bit fixed, similar to optimized but not varints + no manual marshalling optimization
MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice)
MinLenBytes // the previous two combined
)
// QueueManager manages a queue of samples to be sent to the Storage
@ -1402,6 +1405,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pendingMinBytesData[i].Samples = []prompb.Sample{{}}
}
pendingMinLenData := make([]prompb.MinimizedTimeSeriesLen, max)
for i := range pendingMinLenData {
pendingMinLenData[i].Samples = []prompb.Sample{{}}
}
pendingMinLenBytesData := make([]prompb.MinimizedTimeSeriesLenBytes, max)
for i := range pendingMinLenData {
pendingMinLenBytesData[i].Samples = []prompb.Sample{{}}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
if !timer.Stop() {
@ -1459,7 +1472,17 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
case MinBytes:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesBytes(&symbolTable, batch, pendingMinBytesData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMin32Samples(ctx, pendingMin32Data[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
s.sendMinBytes(ctx, pendingMinBytesData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
case MinLen:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
case MinLenBytes:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLenBytes(&symbolTable, batch, pendingMinLenBytesData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinLenBytesSamples(ctx, pendingMinLenBytesData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
}
@ -1588,6 +1611,30 @@ func (s *shards) sendMin32Samples(ctx context.Context, samples []prompb.Minimize
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinLenBytesSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLenBytes, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestLenBytes(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinBytes(ctx context.Context, samples []prompb.MinimizedTimeSeriesBytes, labels string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
@ -1917,6 +1964,104 @@ func populateMinimizedTimeSeriesPacking(symbolTable *rwSymbolTable, batch []time
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func populateMinimizedTimeSeriesLen(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesLen, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceLen(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
// TODO: handle all types
//case tExemplar:
// l := make([]prompb.LabelRef, 0, d.exemplarLabels.Len())
// d.exemplarLabels.Range(func(el labels.Label) {
// nRef := pool.intern(el.Name)
// vRef := pool.intern(el.Value)
// l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
// })
// pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
// Labels: l,
// Value: d.value,
// Timestamp: d.timestamp,
// })
// nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func populateMinimizedTimeSeriesLenBytes(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesLenBytes, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToByteSlice(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
// TODO: handle all types
//case tExemplar:
// l := make([]prompb.LabelRef, 0, d.exemplarLabels.Len())
// d.exemplarLabels.Range(func(el labels.Label) {
// nRef := pool.intern(el.Name)
// vRef := pool.intern(el.Value)
// l = append(l, prompb.LabelRef{NameRef: nRef, ValueRef: vRef})
// })
// pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.ExemplarRef{
// Labels: l,
// Value: d.value,
// Timestamp: d.timestamp,
// })
// nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff
sleepDuration := model.Duration(0)
@ -2025,6 +2170,7 @@ type rwSymbolTable struct {
symbolsMap map[string]offLenPair
symbolsMap64Packed map[string]uint64
symbolsMap32Packed map[string]uint32
symbolsMapBytes map[string]uint32
}
func newRwSymbolTable() rwSymbolTable {
@ -2032,6 +2178,7 @@ func newRwSymbolTable() rwSymbolTable {
symbolsMap: make(map[string]offLenPair),
symbolsMap64Packed: make(map[string]uint64),
symbolsMap32Packed: make(map[string]uint32),
symbolsMapBytes: make(map[string]uint32),
}
}
@ -2075,10 +2222,25 @@ func (r *rwSymbolTable) Ref32Packed(str string) uint32 {
return r.symbolsMap32Packed[str]
}
func (r *rwSymbolTable) RefLen(str string) uint32 {
if ref, ok := r.symbolsMapBytes[str]; ok {
return ref
}
ref := uint32(len(r.symbols))
r.symbols = binary.AppendUvarint(r.symbols, uint64(len(str)))
r.symbols = append(r.symbols, str...)
r.symbolsMapBytes[str] = ref
return ref
}
func (r *rwSymbolTable) LabelsString() string {
return *((*string)(unsafe.Pointer(&r.symbols)))
}
func (r *rwSymbolTable) LabelsData() []byte {
return r.symbols
}
func (r *rwSymbolTable) clear() {
for k := range r.symbolsMap {
delete(r.symbolsMap, k)
@ -2089,6 +2251,9 @@ func (r *rwSymbolTable) clear() {
for k := range r.symbolsMap32Packed {
delete(r.symbolsMap32Packed, k)
}
for k := range r.symbolsMapBytes {
delete(r.symbolsMapBytes, k)
}
r.symbols = r.symbols[:0]
}
@ -2229,6 +2394,98 @@ func buildMinimizedWriteRequestFixed32(samples []prompb.MinimizedTimeSeriesFixed
return compressed, highest, nil
}
func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequestLen{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildMinimizedWriteRequestLenBytes(samples []prompb.MinimizedTimeSeriesLenBytes, labels []byte, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequestLenBytes{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); buf != nil && n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildMinimizedWriteRequestBytes(samples []prompb.MinimizedTimeSeriesBytes, labels string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {

View file

@ -77,6 +77,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var reqMin64Fixed *prompb.MinimizedWriteRequestFixed64
var reqMin32Fixed *prompb.MinimizedWriteRequestFixed32
var reqMinBytes *prompb.MinimizedWriteRequestBytes
var reqMinLen *prompb.MinimizedWriteRequestLen
var reqMinLenBytes *prompb.MinimizedWriteRequestLenBytes
// TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat {
@ -90,6 +92,10 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
reqMin32Fixed, err = DecodeMinimizedWriteRequestFixed32(r.Body)
case MinBytes:
reqMinBytes, err = DecodeMinimizedWriteRequestBytes(r.Body)
case MinLen:
reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body)
case MinLenBytes:
reqMinLenBytes, err = DecodeMinimizedWriteRequestLenBytes(r.Body)
}
if err != nil {
@ -110,6 +116,10 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = h.writeMin32(r.Context(), reqMin32Fixed)
case MinBytes:
err = h.writeMinBytes(r.Context(), reqMinBytes)
case MinLen:
err = h.writeMinLen(r.Context(), reqMinLen)
case MinLenBytes:
err = h.writeMinLenBytes(r.Context(), reqMinLenBytes)
}
switch {
@ -435,6 +445,83 @@ func (h *writeHandler) writeMinBytes(ctx context.Context, req *prompb.MinimizedW
for _, ts := range req.Timeseries {
ls := ByteSliceToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
//e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}
func (h *writeHandler) writeMinLen(ctx context.Context, req *prompb.MinimizedWriteRequestLen) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32LenRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
//e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}
func (h *writeHandler) writeMinLenBytes(ctx context.Context, req *prompb.MinimizedWriteRequestLenBytes) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := ByteSliceToLabelsSymbolsByte(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {