new interning format based on []string indeces

Co-authored-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-11-28 11:54:32 -03:00
parent 3e48b8a582
commit 31d3956f47
9 changed files with 2014 additions and 125 deletions

View file

@ -167,3 +167,134 @@ func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, e
} }
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *MinimizedWriteRequestStr) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
dst = make([]byte, siz)
}
dst = dst[:siz]
n, err := m.OptimizedMarshalToSizedBuffer(dst)
if err != nil {
return nil, err
}
return (dst)[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *MinimizedWriteRequestStr) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x22
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelSymbols in place without extra allocations.
func (m *MinimizedTimeSeriesStr) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelSymbols) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelSymbols {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}

View file

@ -60,7 +60,7 @@ func (x ReadRequest_ResponseType) String() string {
} }
func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3, 0} return fileDescriptor_eefc82927d57d89b, []int{5, 0}
} }
type WriteRequest struct { type WriteRequest struct {
@ -232,6 +232,116 @@ func (m *MinimizedWriteRequestLen) GetSymbols() []byte {
return nil return nil
} }
type MinimizedWriteRequestStr struct {
Timeseries []MinimizedTimeSeriesStr `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Symbols []string `protobuf:"bytes,4,rep,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestStr) Reset() { *m = MinimizedWriteRequestStr{} }
func (m *MinimizedWriteRequestStr) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestStr) ProtoMessage() {}
func (*MinimizedWriteRequestStr) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3}
}
func (m *MinimizedWriteRequestStr) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestStr) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestStr.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestStr) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestStr.Merge(m, src)
}
func (m *MinimizedWriteRequestStr) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestStr) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestStr.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestStr proto.InternalMessageInfo
func (m *MinimizedWriteRequestStr) GetTimeseries() []MinimizedTimeSeriesStr {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestStr) GetSymbols() []string {
if m != nil {
return m.Symbols
}
return nil
}
type MinimizedWriteRequestStrFixed struct {
Timeseries []MinimizedTimeSeriesStrFixed `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Symbols []string `protobuf:"bytes,4,rep,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestStrFixed) Reset() { *m = MinimizedWriteRequestStrFixed{} }
func (m *MinimizedWriteRequestStrFixed) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestStrFixed) ProtoMessage() {}
func (*MinimizedWriteRequestStrFixed) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4}
}
func (m *MinimizedWriteRequestStrFixed) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestStrFixed) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestStrFixed.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestStrFixed) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestStrFixed.Merge(m, src)
}
func (m *MinimizedWriteRequestStrFixed) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestStrFixed) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestStrFixed.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestStrFixed proto.InternalMessageInfo
func (m *MinimizedWriteRequestStrFixed) GetTimeseries() []MinimizedTimeSeriesStrFixed {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestStrFixed) GetSymbols() []string {
if m != nil {
return m.Symbols
}
return nil
}
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.
type ReadRequest struct { type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"`
@ -250,7 +360,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3} return fileDescriptor_eefc82927d57d89b, []int{5}
} }
func (m *ReadRequest) XXX_Unmarshal(b []byte) error { func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -306,7 +416,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4} return fileDescriptor_eefc82927d57d89b, []int{6}
} }
func (m *ReadResponse) XXX_Unmarshal(b []byte) error { func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -356,7 +466,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5} return fileDescriptor_eefc82927d57d89b, []int{7}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -425,7 +535,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) } func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {} func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) { func (*QueryResult) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{6} return fileDescriptor_eefc82927d57d89b, []int{8}
} }
func (m *QueryResult) XXX_Unmarshal(b []byte) error { func (m *QueryResult) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -478,7 +588,7 @@ func (m *ChunkedReadResponse) Reset() { *m = ChunkedReadResponse{} }
func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) } func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) }
func (*ChunkedReadResponse) ProtoMessage() {} func (*ChunkedReadResponse) ProtoMessage() {}
func (*ChunkedReadResponse) Descriptor() ([]byte, []int) { func (*ChunkedReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{7} return fileDescriptor_eefc82927d57d89b, []int{9}
} }
func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error { func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -526,6 +636,8 @@ func init() {
proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest")
proto.RegisterType((*MinimizedWriteRequest)(nil), "prometheus.MinimizedWriteRequest") proto.RegisterType((*MinimizedWriteRequest)(nil), "prometheus.MinimizedWriteRequest")
proto.RegisterType((*MinimizedWriteRequestLen)(nil), "prometheus.MinimizedWriteRequestLen") proto.RegisterType((*MinimizedWriteRequestLen)(nil), "prometheus.MinimizedWriteRequestLen")
proto.RegisterType((*MinimizedWriteRequestStr)(nil), "prometheus.MinimizedWriteRequestStr")
proto.RegisterType((*MinimizedWriteRequestStrFixed)(nil), "prometheus.MinimizedWriteRequestStrFixed")
proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse")
proto.RegisterType((*Query)(nil), "prometheus.Query") proto.RegisterType((*Query)(nil), "prometheus.Query")
@ -536,43 +648,45 @@ func init() {
func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) } func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) }
var fileDescriptor_eefc82927d57d89b = []byte{ var fileDescriptor_eefc82927d57d89b = []byte{
// 568 bytes of a gzipped FileDescriptorProto // 608 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4b, 0x6f, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x4d, 0x6f, 0xd3, 0x4e,
0x10, 0xee, 0xd6, 0x69, 0x13, 0xc6, 0xa1, 0x32, 0xdb, 0x96, 0x9a, 0x1e, 0x9a, 0xc8, 0xe2, 0x10, 0x10, 0xc6, 0xbb, 0x75, 0xda, 0xe4, 0x3f, 0xce, 0xbf, 0x0a, 0xdb, 0x96, 0x9a, 0x4a, 0xb4, 0x95,
0xa9, 0x28, 0x88, 0x50, 0x71, 0xea, 0x81, 0xb4, 0x44, 0x0a, 0x25, 0xe6, 0xb1, 0x09, 0x02, 0x21, 0x85, 0x44, 0xa4, 0xa2, 0x20, 0x4a, 0xc5, 0xa9, 0x07, 0xda, 0x12, 0x54, 0x4a, 0xcc, 0xcb, 0x3a,
0x24, 0xcb, 0xb1, 0x47, 0x8d, 0x45, 0xfc, 0xa8, 0x77, 0x2d, 0x35, 0x9c, 0x39, 0x71, 0xe2, 0x37, 0x08, 0x84, 0x90, 0x2c, 0x27, 0x1e, 0x35, 0x16, 0xf5, 0x4b, 0x77, 0xd7, 0x52, 0xc3, 0x99, 0x13,
0x71, 0xea, 0x09, 0xf1, 0x0b, 0x10, 0xca, 0x2f, 0x41, 0x7e, 0x85, 0x0d, 0x44, 0x94, 0xdb, 0xee, 0x27, 0x0e, 0x7c, 0x22, 0x4e, 0x3d, 0x21, 0x3e, 0x01, 0x42, 0xfd, 0x24, 0xc8, 0x6f, 0x61, 0x03,
0x7c, 0x8f, 0xfd, 0x76, 0x76, 0x6c, 0xa8, 0xc7, 0xe8, 0x87, 0x02, 0xdb, 0x51, 0x1c, 0x8a, 0x90, 0x29, 0x81, 0xde, 0xec, 0x99, 0xe7, 0x79, 0xf2, 0xdb, 0xc9, 0x78, 0xa1, 0xce, 0x31, 0x88, 0x24,
0x42, 0x14, 0x87, 0x3e, 0x8a, 0x09, 0x26, 0x7c, 0x5f, 0x15, 0xb3, 0x08, 0x79, 0x0e, 0xec, 0xef, 0xb6, 0x62, 0x1e, 0xc9, 0x88, 0x42, 0xcc, 0xa3, 0x00, 0xe5, 0x00, 0x13, 0xb1, 0xaa, 0xcb, 0x61,
0x9c, 0x87, 0xe7, 0x61, 0xb6, 0xbc, 0x9f, 0xae, 0xf2, 0xaa, 0xf1, 0x85, 0x40, 0xfd, 0x4d, 0xec, 0x8c, 0x22, 0x6f, 0xac, 0x2e, 0x1d, 0x45, 0x47, 0x51, 0xf6, 0x78, 0x3b, 0x7d, 0xca, 0xab, 0xe6,
0x09, 0x64, 0x78, 0x91, 0x20, 0x17, 0xf4, 0x18, 0x40, 0x78, 0x3e, 0x72, 0x8c, 0x3d, 0xe4, 0x3a, 0x47, 0x02, 0xf5, 0x97, 0xdc, 0x97, 0xc8, 0xf0, 0x24, 0x41, 0x21, 0xe9, 0x0e, 0x80, 0xf4, 0x03,
0x69, 0x2a, 0x2d, 0xb5, 0x73, 0xbb, 0xfd, 0xdb, 0xb4, 0x3d, 0xf2, 0x7c, 0x1c, 0x66, 0xe8, 0x49, 0x14, 0xc8, 0x7d, 0x14, 0x06, 0xd9, 0xd0, 0x9a, 0xfa, 0xd6, 0xd5, 0xd6, 0xcf, 0xd0, 0x56, 0xd7,
0xe5, 0xea, 0x47, 0x63, 0x8d, 0x49, 0x7c, 0x7a, 0x0c, 0x35, 0x1f, 0x85, 0xed, 0xda, 0xc2, 0xd6, 0x0f, 0xd0, 0xce, 0xba, 0x7b, 0x95, 0xb3, 0x6f, 0xeb, 0x33, 0x4c, 0xd1, 0xd3, 0x1d, 0xa8, 0x05,
0x95, 0x4c, 0xbb, 0x2f, 0x6b, 0x4d, 0x14, 0xb1, 0xe7, 0x98, 0x05, 0xa3, 0xd0, 0x2f, 0x14, 0x67, 0x28, 0x5d, 0xcf, 0x95, 0xae, 0xa1, 0x65, 0xde, 0x55, 0xd5, 0x6b, 0xa1, 0xe4, 0x7e, 0xdf, 0x2a,
0x95, 0xda, 0xba, 0xa6, 0x18, 0x9f, 0x08, 0xec, 0x9a, 0x5e, 0xe0, 0xf9, 0xde, 0x47, 0x74, 0x97, 0x14, 0x85, 0x7f, 0xe4, 0x38, 0xac, 0xd4, 0x66, 0x1b, 0x9a, 0xf9, 0x9e, 0xc0, 0xb2, 0xe5, 0x87,
0xb2, 0xf5, 0x56, 0x64, 0x6b, 0x2c, 0xf9, 0x97, 0xb2, 0x7f, 0x86, 0xd4, 0xa1, 0xca, 0x67, 0xfe, 0x7e, 0xe0, 0xbf, 0x43, 0x6f, 0x8c, 0xad, 0x3d, 0x81, 0x6d, 0x7d, 0x2c, 0xbf, 0xb4, 0xfd, 0x11,
0x38, 0x9c, 0x72, 0xbd, 0xd2, 0x24, 0xad, 0x1b, 0xac, 0xdc, 0xe6, 0x01, 0xce, 0x2a, 0x35, 0x45, 0xd2, 0x80, 0xaa, 0x18, 0x06, 0xbd, 0xe8, 0x58, 0x18, 0x95, 0x0d, 0xd2, 0xfc, 0x8f, 0x95, 0xaf,
0xab, 0x18, 0x9f, 0x09, 0xe8, 0x2b, 0x63, 0x0c, 0x30, 0xa0, 0xfd, 0x15, 0x49, 0x8c, 0x6b, 0x92, 0x39, 0xc0, 0x61, 0xa5, 0xa6, 0x35, 0x2a, 0xe6, 0x07, 0x02, 0xc6, 0x44, 0x8c, 0x0e, 0x86, 0xf4,
0x0c, 0x30, 0xb8, 0x3e, 0x4c, 0x7d, 0x75, 0x98, 0x6f, 0x04, 0x54, 0x86, 0xb6, 0x5b, 0x76, 0xe2, 0x60, 0x02, 0x89, 0x39, 0x85, 0xa4, 0x83, 0xe1, 0x74, 0x98, 0xfa, 0x3f, 0xc2, 0xd8, 0x92, 0x5f,
0x10, 0xaa, 0x17, 0x89, 0x7c, 0xf8, 0x2d, 0xf9, 0xf0, 0x57, 0x09, 0xc6, 0x33, 0x56, 0x32, 0xe8, 0x0a, 0xc6, 0x96, 0x7c, 0x1a, 0x8c, 0x76, 0xd1, 0x64, 0x3e, 0x11, 0xb8, 0x7e, 0x11, 0xcc, 0x43,
0x7b, 0xd8, 0xb3, 0x1d, 0x07, 0x23, 0x81, 0xae, 0x15, 0x23, 0x8f, 0xc2, 0x80, 0xa3, 0x95, 0x8d, 0xff, 0x14, 0x3d, 0x6a, 0x4d, 0x20, 0xba, 0x39, 0x9d, 0x28, 0x33, 0x5f, 0x1a, 0xeb, 0x0b, 0x01,
0x86, 0xbe, 0xde, 0x54, 0x5a, 0x5b, 0x9d, 0xbb, 0xb2, 0x58, 0x3a, 0xa6, 0xcd, 0x0a, 0xf6, 0x68, 0x9d, 0xa1, 0xeb, 0x95, 0xdb, 0xb2, 0x09, 0xd5, 0x93, 0x44, 0x25, 0xb8, 0xa2, 0x12, 0x3c, 0x4f,
0x16, 0x21, 0xdb, 0x2d, 0x4d, 0xe4, 0x2a, 0x37, 0x8e, 0xa0, 0x2e, 0x17, 0xa8, 0x0a, 0xd5, 0x61, 0x90, 0x0f, 0x59, 0xa9, 0xa0, 0x6f, 0x60, 0xc5, 0xed, 0xf7, 0x31, 0x96, 0xe8, 0x39, 0x1c, 0x45,
0xd7, 0x7c, 0x39, 0xe8, 0x0d, 0xb5, 0x35, 0xba, 0x07, 0xdb, 0xc3, 0x11, 0xeb, 0x75, 0xcd, 0xde, 0x1c, 0x85, 0x02, 0x9d, 0xec, 0xf3, 0x31, 0x66, 0x37, 0xb4, 0xe6, 0xc2, 0xd6, 0x0d, 0xd5, 0xac,
0x13, 0xeb, 0xed, 0x0b, 0x66, 0x9d, 0xf6, 0x5f, 0x3f, 0x7f, 0x36, 0xd4, 0x88, 0xd1, 0x4d, 0x55, 0xfc, 0x4c, 0x8b, 0x15, 0xea, 0xee, 0x30, 0x46, 0xb6, 0x5c, 0x86, 0xa8, 0x55, 0x61, 0x6e, 0x43,
0xf6, 0xc2, 0x8a, 0x3e, 0x80, 0x6a, 0x8c, 0x3c, 0x99, 0x8a, 0xf2, 0x42, 0x7b, 0x7f, 0x5f, 0x28, 0x5d, 0x2d, 0x50, 0x1d, 0xaa, 0xf6, 0xae, 0xf5, 0xac, 0xd3, 0xb6, 0x1b, 0x33, 0x74, 0x05, 0x16,
0xc3, 0x59, 0xc9, 0x33, 0xbe, 0x12, 0xd8, 0xc8, 0x00, 0x7a, 0x0f, 0x28, 0x17, 0x76, 0x2c, 0xac, 0xed, 0x2e, 0x6b, 0xef, 0x5a, 0xed, 0x07, 0xce, 0xab, 0xa7, 0xcc, 0xd9, 0x3f, 0x78, 0xf1, 0xe4,
0xac, 0xaf, 0xc2, 0xf6, 0x23, 0xcb, 0x4f, 0x7d, 0x48, 0x4b, 0x61, 0x5a, 0x86, 0x8c, 0x4a, 0xc0, 0xb1, 0xdd, 0x20, 0xe6, 0x6e, 0xea, 0x72, 0x47, 0x51, 0xf4, 0x0e, 0x54, 0x39, 0x8a, 0xe4, 0x58,
0xe4, 0xb4, 0x05, 0x1a, 0x06, 0xee, 0x32, 0x77, 0x3d, 0xe3, 0x6e, 0x61, 0xe0, 0xca, 0xcc, 0x23, 0x96, 0x07, 0x5a, 0xf9, 0xfd, 0x40, 0x59, 0x9f, 0x95, 0x3a, 0xf3, 0x33, 0x81, 0xb9, 0xac, 0x41,
0xa8, 0xf9, 0xb6, 0x70, 0x26, 0x18, 0xf3, 0x62, 0x9a, 0x75, 0x39, 0xd5, 0xc0, 0x1e, 0xe3, 0xd4, 0x6f, 0x01, 0x15, 0xd2, 0xe5, 0xd2, 0xc9, 0xe6, 0x2a, 0xdd, 0x20, 0x76, 0x82, 0x34, 0x87, 0x34,
0xcc, 0x09, 0x6c, 0xc1, 0xa4, 0x87, 0xb0, 0x31, 0xf1, 0x02, 0x91, 0xbf, 0xa7, 0xda, 0xd9, 0xfd, 0x35, 0xd6, 0xc8, 0x3a, 0xdd, 0xb2, 0x61, 0x09, 0xda, 0x84, 0x06, 0x86, 0xde, 0xb8, 0x76, 0x36,
0xb3, 0xb9, 0xfd, 0x14, 0x64, 0x39, 0xc7, 0xe8, 0x81, 0x2a, 0x5d, 0x8e, 0x3e, 0xfa, 0xff, 0xaf, 0xd3, 0x2e, 0x60, 0xe8, 0xa9, 0xca, 0x6d, 0xa8, 0x05, 0xae, 0xec, 0x0f, 0x90, 0x8b, 0xe2, 0x8b,
0x4f, 0x9e, 0x22, 0xe3, 0x12, 0xb6, 0x4f, 0x27, 0x49, 0xf0, 0x21, 0x7d, 0x1c, 0xa9, 0xab, 0x8f, 0x37, 0x54, 0xaa, 0x8e, 0xdb, 0xc3, 0x63, 0x2b, 0x17, 0xb0, 0x91, 0x92, 0x6e, 0xc2, 0xdc, 0xc0,
0x61, 0xcb, 0xc9, 0xcb, 0xd6, 0x92, 0xe5, 0x1d, 0xd9, 0xb2, 0x10, 0x16, 0xae, 0x37, 0x1d, 0x79, 0x0f, 0x65, 0xbe, 0xf3, 0xfa, 0xd6, 0xf2, 0xaf, 0xc3, 0x3d, 0x48, 0x9b, 0x2c, 0xd7, 0x98, 0x6d,
0x4b, 0x1b, 0xa0, 0xa6, 0x63, 0x34, 0xb3, 0xbc, 0xc0, 0xc5, 0xcb, 0xa2, 0x4f, 0x90, 0x95, 0x9e, 0xd0, 0x95, 0xc3, 0xd1, 0x7b, 0x7f, 0x7f, 0x43, 0xa9, 0x5b, 0x64, 0x9e, 0xc2, 0xe2, 0xfe, 0x20,
0xa6, 0x95, 0x93, 0x9d, 0xab, 0xf9, 0x01, 0xf9, 0x3e, 0x3f, 0x20, 0x3f, 0xe7, 0x07, 0xe4, 0xdd, 0x09, 0xdf, 0xa6, 0x7f, 0x8e, 0x32, 0xd5, 0xfb, 0xb0, 0xd0, 0xcf, 0xcb, 0xce, 0x58, 0xe4, 0x35,
0x66, 0xea, 0x1b, 0x8d, 0xc7, 0x9b, 0xd9, 0xdf, 0xe5, 0xe1, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x35, 0xb2, 0x30, 0x16, 0xa9, 0xff, 0xf7, 0xd5, 0x57, 0xba, 0x0e, 0x7a, 0xba, 0x46, 0x43, 0xc7,
0xf1, 0x65, 0x72, 0x0c, 0x9c, 0x04, 0x00, 0x00, 0x0f, 0x3d, 0x3c, 0x2d, 0xe6, 0x04, 0x59, 0xe9, 0x51, 0x5a, 0xd9, 0x5b, 0x3a, 0x3b, 0x5f, 0x23,
0x5f, 0xcf, 0xd7, 0xc8, 0xf7, 0xf3, 0x35, 0xf2, 0x7a, 0x3e, 0xcd, 0x8d, 0x7b, 0xbd, 0xf9, 0xec,
0x06, 0xbe, 0xfb, 0x23, 0x00, 0x00, 0xff, 0xff, 0xf6, 0xd2, 0x59, 0xc2, 0xc0, 0x05, 0x00, 0x00,
} }
func (m *WriteRequest) Marshal() (dAtA []byte, err error) { func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
@ -726,6 +840,106 @@ func (m *MinimizedWriteRequestLen) MarshalToSizedBuffer(dAtA []byte) (int, error
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *MinimizedWriteRequestStr) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestStr) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestStr) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x22
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestStrFixed) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestStrFixed) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestStrFixed) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x22
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) { func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -1055,6 +1269,54 @@ func (m *MinimizedWriteRequestLen) Size() (n int) {
return n return n
} }
func (m *MinimizedWriteRequestStr) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if len(m.Symbols) > 0 {
for _, s := range m.Symbols {
l = len(s)
n += 1 + l + sovRemote(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *MinimizedWriteRequestStrFixed) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if len(m.Symbols) > 0 {
for _, s := range m.Symbols {
l = len(s)
n += 1 + l + sovRemote(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *ReadRequest) Size() (n int) { func (m *ReadRequest) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -1526,6 +1788,240 @@ func (m *MinimizedWriteRequestLen) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *MinimizedWriteRequestStr) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestStr: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestStr: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesStr{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequestStrFixed) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestStrFixed: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestStrFixed: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesStrFixed{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadRequest) Unmarshal(dAtA []byte) error { func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0

View file

@ -51,6 +51,20 @@ message MinimizedWriteRequestLen {
bytes symbols = 4; bytes symbols = 4;
} }
message MinimizedWriteRequestStr {
repeated MinimizedTimeSeriesStr timeseries = 1 [(gogoproto.nullable) = false];
reserved 2;
reserved 3;
repeated string symbols = 4;
}
message MinimizedWriteRequestStrFixed {
repeated MinimizedTimeSeriesStrFixed timeseries = 1 [(gogoproto.nullable) = false];
reserved 2;
reserved 3;
repeated string symbols = 4;
}
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.
message ReadRequest { message ReadRequest {
repeated Query queries = 1; repeated Query queries = 1;

File diff suppressed because it is too large Load diff

View file

@ -159,6 +159,31 @@ message MinimizedTimeSeriesLen {
// TODO: add metadata // TODO: add metadata
} }
message MinimizedTimeSeriesStr {
// Sorted list of label name-value pair references, encoded as indices to a strings array.
// This list's len is always multiple of 2.
repeated uint32 label_symbols = 1;
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message MinimizedTimeSeriesStrFixed {
// Sorted list of label name-value pair references, encoded as indices to a strings array.
// This list's len is always multiple of 2.
repeated fixed32 label_symbols = 1;
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message Label { message Label {
string name = 1; string name = 1;
string value = 2; string value = 2;

View file

@ -9,6 +9,8 @@ declare -a INSTANCES
INSTANCES+=('sender-v1;;receiver-v1;') INSTANCES+=('sender-v1;;receiver-v1;')
INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-v11-min32-optimized-varint;--remote-write-format 1') INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-v11-min32-optimized-varint;--remote-write-format 1')
INSTANCES+=('sender-v11-min-len;--remote-write-format 2;receiver-v11-min-len;--remote-write-format 2') INSTANCES+=('sender-v11-min-len;--remote-write-format 2;receiver-v11-min-len;--remote-write-format 2')
INSTANCES+=('sender-v11-min-str;--remote-write-format 3;receiver-v11-min-str;--remote-write-format 3')
INSTANCES+=('sender-v11-min-str-fixed;--remote-write-format 4;receiver-v11-min-str-fixed;--remote-write-format 4')
# ~~~~~~~~~~~~~ # ~~~~~~~~~~~~~

View file

@ -807,6 +807,17 @@ func labelsToUint32SliceLen(lbls labels.Labels, symbolTable *rwSymbolTable, buf
return result return result
} }
func labelsToUint32SliceStr(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0]
lbls.Range(func(l labels.Label) {
off := symbolTable.RefStr(l.Name)
result = append(result, off)
off = symbolTable.RefStr(l.Value)
result = append(result, off)
})
return result
}
func Uint32LenRefToLabels(symbols []byte, minLabels []uint32) labels.Labels { func Uint32LenRefToLabels(symbols []byte, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2) ls := labels.NewScratchBuilder(len(minLabels) / 2)
@ -830,6 +841,23 @@ func Uint32LenRefToLabels(symbols []byte, minLabels []uint32) labels.Labels {
return ls.Labels() return ls.Labels()
} }
func Uint32StrRefToLabels(symbols []string, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
strIdx := 0
for strIdx < len(minLabels) {
// todo, check for overflow?
nameIdx := minLabels[strIdx]
strIdx++
valueIdx := minLabels[strIdx]
strIdx++
ls.Add(symbols[nameIdx], symbols[valueIdx])
}
return ls.Labels()
}
func yoloString(b []byte) string { func yoloString(b []byte) string {
return *(*string)(unsafe.Pointer(&b)) return *(*string)(unsafe.Pointer(&b))
} }
@ -982,6 +1010,44 @@ func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestL
return &req, nil return &req, nil
} }
func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestStr, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestStr
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func DecodeMinimizedWriteRequestStrFixed(r io.Reader) (*prompb.MinimizedWriteRequestStrFixed, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestStrFixed
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) { func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)), Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),

View file

@ -395,6 +395,8 @@ const (
Base1 RemoteWriteFormat = iota // original map based format Base1 RemoteWriteFormat = iota // original map based format
Min32Optimized // two 32bit varint plus marshalling optimization Min32Optimized // two 32bit varint plus marshalling optimization
MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice) MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice)
MinStrings // symbols are indices into an array of strings
MinStringsFix // symbols are indices into an array of strings. Indices are fixed length
) )
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
@ -1392,6 +1394,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pendingMinLenData[i].Samples = []prompb.Sample{{}} pendingMinLenData[i].Samples = []prompb.Sample{{}}
} }
pendingMinStrData := make([]prompb.MinimizedTimeSeriesStr, max)
for i := range pendingMinStrData {
pendingMinStrData[i].Samples = []prompb.Sample{{}}
}
pendingMinStrFixedData := make([]prompb.MinimizedTimeSeriesStrFixed, max)
for i := range pendingMinStrFixedData {
pendingMinStrFixedData[i].Samples = []prompb.Sample{{}}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() { stop := func() {
if !timer.Stop() { if !timer.Stop() {
@ -1441,6 +1453,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear() symbolTable.clear()
case MinStrings:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear()
case MinStringsFix:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStrFixed(&symbolTable, batch, pendingMinStrFixedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrFixedSamples(ctx, pendingMinStrFixedData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
@ -1472,6 +1494,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear() symbolTable.clear()
case MinStrings:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear()
case MinStringsFix:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStrFixed(&symbolTable, batch, pendingMinStrFixedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrFixedSamples(ctx, pendingMinStrFixedData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
} }
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
@ -1556,6 +1588,30 @@ func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.Minimiz
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinStrSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesStr, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestStr(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinStrFixedSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesStrFixed, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestStrFixed(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) { func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) {
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
@ -1716,6 +1772,90 @@ func populateMinimizedTimeSeriesLen(symbolTable *rwSymbolTable, batch []timeSeri
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesStr, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func populateMinimizedTimeSeriesStrFixed(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesStrFixed, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff backoff := cfg.MinBackoff
sleepDuration := model.Duration(0) sleepDuration := model.Duration(0)
@ -1821,6 +1961,7 @@ type offLenPair struct {
type rwSymbolTable struct { type rwSymbolTable struct {
symbols []byte symbols []byte
strings []string
symbolsMap map[string]offLenPair symbolsMap map[string]offLenPair
symbolsMapBytes map[string]uint32 symbolsMapBytes map[string]uint32
} }
@ -1859,15 +2000,30 @@ func (r *rwSymbolTable) RefLen(str string) uint32 {
return ref return ref
} }
func (r *rwSymbolTable) RefStr(str string) uint32 {
if ref, ok := r.symbolsMapBytes[str]; ok {
return ref
}
ref := uint32(len(r.strings))
r.strings = append(r.strings, str)
r.symbolsMapBytes[str] = ref
return ref
}
func (r *rwSymbolTable) LabelsString() string { func (r *rwSymbolTable) LabelsString() string {
return *((*string)(unsafe.Pointer(&r.symbols))) return *((*string)(unsafe.Pointer(&r.symbols)))
} }
func (r *rwSymbolTable) LabelsStrings() []string {
return r.strings
}
func (r *rwSymbolTable) LabelsData() []byte { func (r *rwSymbolTable) LabelsData() []byte {
return r.symbols return r.symbols
} }
func (r *rwSymbolTable) clear() { func (r *rwSymbolTable) clear() {
r.strings = r.strings[:0]
for k := range r.symbolsMap { for k := range r.symbolsMap {
delete(r.symbolsMap, k) delete(r.symbolsMap, k)
} }
@ -1967,3 +2123,95 @@ func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labe
} }
return compressed, highest, nil return compressed, highest, nil
} }
func buildMinimizedWriteRequestStr(samples []prompb.MinimizedTimeSeriesStr, labels []string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequestStr{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = &[]byte{} // For convenience in tests. Not efficient.
}
data, err := req.OptimizedMarshal(*pBuf)
if err != nil {
return nil, 0, err
}
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, data)
if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildMinimizedWriteRequestStrFixed(samples []prompb.MinimizedTimeSeriesStrFixed, labels []string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequestStrFixed{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}

View file

@ -75,6 +75,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req *prompb.WriteRequest var req *prompb.WriteRequest
var reqMin *prompb.MinimizedWriteRequest var reqMin *prompb.MinimizedWriteRequest
var reqMinLen *prompb.MinimizedWriteRequestLen var reqMinLen *prompb.MinimizedWriteRequestLen
var reqMinStr *prompb.MinimizedWriteRequestStr
var reqMinFixed *prompb.MinimizedWriteRequestStrFixed
// TODO: this should eventually be done via content negotiation/looking at the header // TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat { switch h.rwFormat {
@ -84,6 +86,10 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
reqMin, err = DecodeMinimizedWriteRequest(r.Body) reqMin, err = DecodeMinimizedWriteRequest(r.Body)
case MinLen: case MinLen:
reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body) reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body)
case MinStrings:
reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body)
case MinStringsFix:
reqMinFixed, err = DecodeMinimizedWriteRequestStrFixed(r.Body)
} }
if err != nil { if err != nil {
@ -100,6 +106,10 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = h.writeMin(r.Context(), reqMin) err = h.writeMin(r.Context(), reqMin)
case MinLen: case MinLen:
err = h.writeMinLen(r.Context(), reqMinLen) err = h.writeMinLen(r.Context(), reqMinLen)
case MinStrings:
err = h.writeMinStr(r.Context(), reqMinStr)
case MinStringsFix:
err = h.writeMinStrFixed(r.Context(), reqMinFixed)
} }
switch { switch {
@ -370,3 +380,79 @@ func (h *writeHandler) writeMinLen(ctx context.Context, req *prompb.MinimizedWri
return nil return nil
} }
func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWriteRequestStr) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32StrRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}
func (h *writeHandler) writeMinStrFixed(ctx context.Context, req *prompb.MinimizedWriteRequestStrFixed) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32StrRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}