Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2023-11-24 12:00:44 +00:00
parent a3c6904243
commit c114afa6e6
13 changed files with 641 additions and 1823 deletions

View file

@ -40,7 +40,7 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
return r.Marshal() return r.Marshal()
} }
func (m *MinimizedWriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) { func (m *WriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size() siz := m.Size()
if cap(dst) < siz { if cap(dst) < siz {
dst = make([]byte, siz) dst = make([]byte, siz)
@ -55,8 +55,8 @@ func (m *MinimizedWriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
} }
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer, // OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries. // but calls OptimizedMarshalToSizedBuffer on the InternedTimeseries.
func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) { func (m *WriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA) i := len(dAtA)
_ = i _ = i
var l int var l int
@ -65,12 +65,21 @@ func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int,
i -= len(m.XXX_unrecognized) i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized)
} }
if len(m.Symbols2) > 0 {
for iNdEx := len(m.Symbols2) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols2[iNdEx])
copy(dAtA[i:], m.Symbols2[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols2[iNdEx])))
i--
dAtA[i] = 0x2a
}
}
if len(m.Symbols) > 0 { if len(m.Symbols) > 0 {
i -= len(m.Symbols) i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols) copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols))) i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i-- i--
dAtA[i] = 0x22 dAtA[i] = 0x22 // Depends on the field position.
} }
if len(m.Timeseries) > 0 { if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
@ -91,7 +100,7 @@ func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int,
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer, // OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelSymbols in place without extra allocations. // but marshals m.LabelSymbols in place without extra allocations.
func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) { func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA) i := len(dAtA)
_ = i _ = i
var l int var l int
@ -100,6 +109,31 @@ func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, e
i -= len(m.XXX_unrecognized) i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized)
} }
if len(m.LabelSymbols) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelSymbols {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0x52
}
if len(m.Histograms) > 0 { if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- { for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{ {
@ -142,28 +176,7 @@ func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, e
dAtA[i] = 0x12 dAtA[i] = 0x12
} }
} }
if len(m.LabelSymbols) > 0 { // TODO: Metadata & created ts.
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelSymbols {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }

View file

@ -23,16 +23,16 @@ func TestOptimizedMarshal(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
m *MinimizedWriteRequest m *WriteRequest
}{ }{
// { // {
// name: "empty", // name: "empty",
// m: &MinimizedWriteRequest{}, // m: &MinimizedWriteRequest{},
// }, // },
{ {
name: "simple", name: "s1",
m: &MinimizedWriteRequest{ m: &WriteRequest{
Timeseries: []MinimizedTimeSeries{ Timeseries: []TimeSeries{
{ {
LabelSymbols: []uint32{ LabelSymbols: []uint32{
0, 10, 0, 10,
@ -68,11 +68,48 @@ func TestOptimizedMarshal(t *testing.T) {
Symbols: "abcdefghijabcdefghijabcdefghijabcdefghij", Symbols: "abcdefghijabcdefghijabcdefghijabcdefghij",
}, },
}, },
{
name: "s2",
m: &WriteRequest{
Timeseries: []TimeSeries{
{
LabelSymbols: []uint32{
0,
1,
3,
4,
5,
6,
},
Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: nil,
},
{
LabelSymbols: []uint32{
0,
2,
3,
4,
5,
6,
},
Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: nil,
},
},
Symbols2: []string{
"foo", "bar",
"rab", "__name__",
"xyz", "bbb", "xaa",
},
},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got = got[:0]
// should be the same as the standard marshal // should be the same as the standard marshal
expected, err := tt.m.Marshal() expected, err := tt.m.Marshal()
require.NoError(t, err) require.NoError(t, err)
@ -81,7 +118,7 @@ func TestOptimizedMarshal(t *testing.T) {
require.Equal(t, expected, got) require.Equal(t, expected, got)
// round trip // round trip
m := &MinimizedWriteRequest{} m := &WriteRequest{}
require.NoError(t, m.Unmarshal(got)) require.NoError(t, m.Unmarshal(got))
require.Equal(t, tt.m, m) require.Equal(t, tt.m, m)
}) })

View file

@ -60,15 +60,18 @@ func (x ReadRequest_ResponseType) String() string {
} }
func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3, 0} return fileDescriptor_eefc82927d57d89b, []int{1, 0}
} }
type WriteRequest struct { type WriteRequest struct {
Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Metadata []MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata"` // The symbols table. All symbols are concatenated strings. To read the symbols table, it's required
XXX_NoUnkeyedLiteral struct{} `json:"-"` // to know the offset:length range of the actual symbol to read from this string.
XXX_unrecognized []byte `json:"-"` Symbols string `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_sizecache int32 `json:"-"` Symbols2 []string `protobuf:"bytes,5,rep,name=symbols2,proto3" json:"symbols2,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
} }
func (m *WriteRequest) Reset() { *m = WriteRequest{} } func (m *WriteRequest) Reset() { *m = WriteRequest{} }
@ -111,123 +114,16 @@ func (m *WriteRequest) GetTimeseries() []TimeSeries {
return nil return nil
} }
func (m *WriteRequest) GetMetadata() []MetricMetadata { func (m *WriteRequest) GetSymbols() string {
if m != nil {
return m.Metadata
}
return nil
}
type MinimizedWriteRequest struct {
Timeseries []MinimizedTimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings. To read the symbols table, it's required
// to know the offset:length range of the actual symbol to read from this string.
Symbols string `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequest) Reset() { *m = MinimizedWriteRequest{} }
func (m *MinimizedWriteRequest) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequest) ProtoMessage() {}
func (*MinimizedWriteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{1}
}
func (m *MinimizedWriteRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequest.Merge(m, src)
}
func (m *MinimizedWriteRequest) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequest) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequest.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequest proto.InternalMessageInfo
func (m *MinimizedWriteRequest) GetTimeseries() []MinimizedTimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequest) GetSymbols() string {
if m != nil { if m != nil {
return m.Symbols return m.Symbols
} }
return "" return ""
} }
type MinimizedWriteRequestLen struct { func (m *WriteRequest) GetSymbols2() []string {
Timeseries []MinimizedTimeSeriesLen `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
Symbols []byte `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestLen) Reset() { *m = MinimizedWriteRequestLen{} }
func (m *MinimizedWriteRequestLen) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestLen) ProtoMessage() {}
func (*MinimizedWriteRequestLen) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{2}
}
func (m *MinimizedWriteRequestLen) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestLen) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestLen.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestLen) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestLen.Merge(m, src)
}
func (m *MinimizedWriteRequestLen) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestLen) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestLen.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestLen proto.InternalMessageInfo
func (m *MinimizedWriteRequestLen) GetTimeseries() []MinimizedTimeSeriesLen {
if m != nil { if m != nil {
return m.Timeseries return m.Symbols2
}
return nil
}
func (m *MinimizedWriteRequestLen) GetSymbols() []byte {
if m != nil {
return m.Symbols
} }
return nil return nil
} }
@ -250,7 +146,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3} return fileDescriptor_eefc82927d57d89b, []int{1}
} }
func (m *ReadRequest) XXX_Unmarshal(b []byte) error { func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -306,7 +202,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4} return fileDescriptor_eefc82927d57d89b, []int{2}
} }
func (m *ReadResponse) XXX_Unmarshal(b []byte) error { func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -356,7 +252,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5} return fileDescriptor_eefc82927d57d89b, []int{3}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -425,7 +321,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) } func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {} func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) { func (*QueryResult) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{6} return fileDescriptor_eefc82927d57d89b, []int{4}
} }
func (m *QueryResult) XXX_Unmarshal(b []byte) error { func (m *QueryResult) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -478,7 +374,7 @@ func (m *ChunkedReadResponse) Reset() { *m = ChunkedReadResponse{} }
func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) } func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) }
func (*ChunkedReadResponse) ProtoMessage() {} func (*ChunkedReadResponse) ProtoMessage() {}
func (*ChunkedReadResponse) Descriptor() ([]byte, []int) { func (*ChunkedReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{7} return fileDescriptor_eefc82927d57d89b, []int{5}
} }
func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error { func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -524,8 +420,6 @@ func (m *ChunkedReadResponse) GetQueryIndex() int64 {
func init() { func init() {
proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value) proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value)
proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest")
proto.RegisterType((*MinimizedWriteRequest)(nil), "prometheus.MinimizedWriteRequest")
proto.RegisterType((*MinimizedWriteRequestLen)(nil), "prometheus.MinimizedWriteRequestLen")
proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse")
proto.RegisterType((*Query)(nil), "prometheus.Query") proto.RegisterType((*Query)(nil), "prometheus.Query")
@ -536,43 +430,39 @@ func init() {
func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) } func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) }
var fileDescriptor_eefc82927d57d89b = []byte{ var fileDescriptor_eefc82927d57d89b = []byte{
// 568 bytes of a gzipped FileDescriptorProto // 506 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4b, 0x6f, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xee, 0xd6, 0x69, 0x13, 0xc6, 0xa1, 0x32, 0xdb, 0x96, 0x9a, 0x1e, 0x9a, 0xc8, 0xe2, 0x10, 0x10, 0xee, 0xd6, 0x49, 0x93, 0x8e, 0x43, 0x64, 0xb6, 0x2d, 0x31, 0x39, 0xa4, 0x91, 0xc5, 0xc1,
0xa9, 0x28, 0x88, 0x50, 0x71, 0xea, 0x81, 0xb4, 0x44, 0x0a, 0x25, 0xe6, 0xb1, 0x09, 0x02, 0x21, 0x52, 0x51, 0x10, 0xa1, 0xe2, 0xc4, 0x81, 0xb4, 0x44, 0x2a, 0xa5, 0xe1, 0x67, 0x13, 0x04, 0x42,
0x24, 0xcb, 0xb1, 0x47, 0x8d, 0x45, 0xfc, 0xa8, 0x77, 0x2d, 0x35, 0x9c, 0x39, 0x71, 0xe2, 0x37, 0x48, 0x96, 0x93, 0x8c, 0x1a, 0x8b, 0xf8, 0xa7, 0xbb, 0x6b, 0xa9, 0x79, 0x09, 0x1e, 0x8a, 0x53,
0x71, 0xea, 0x09, 0xf1, 0x0b, 0x10, 0xca, 0x2f, 0x41, 0x7e, 0x85, 0x0d, 0x44, 0x94, 0xdb, 0xee, 0x4f, 0x88, 0x27, 0x40, 0x28, 0x4f, 0x82, 0xbc, 0xb6, 0xc3, 0x16, 0x2e, 0xdc, 0x66, 0xbe, 0xef,
0x7c, 0x8f, 0xfd, 0x76, 0x76, 0x6c, 0xa8, 0xc7, 0xe8, 0x87, 0x02, 0xdb, 0x51, 0x1c, 0x8a, 0x90, 0x9b, 0xcf, 0x33, 0xb3, 0x63, 0x68, 0x70, 0x0c, 0x63, 0x89, 0xbd, 0x84, 0xc7, 0x32, 0xa6, 0x90,
0x42, 0x14, 0x87, 0x3e, 0x8a, 0x09, 0x26, 0x7c, 0x5f, 0x15, 0xb3, 0x08, 0x79, 0x0e, 0xec, 0xef, 0xf0, 0x38, 0x44, 0xb9, 0xc0, 0x54, 0xb4, 0x4d, 0xb9, 0x4a, 0x50, 0xe4, 0x44, 0x7b, 0xff, 0x32,
0x9c, 0x87, 0xe7, 0x61, 0xb6, 0xbc, 0x9f, 0xae, 0xf2, 0xaa, 0xf1, 0x85, 0x40, 0xfd, 0x4d, 0xec, 0xbe, 0x8c, 0x55, 0xf8, 0x28, 0x8b, 0x72, 0xd4, 0xf9, 0x4a, 0xa0, 0xf1, 0x81, 0x07, 0x12, 0x19,
0x09, 0x64, 0x78, 0x91, 0x20, 0x17, 0xf4, 0x18, 0x40, 0x78, 0x3e, 0x72, 0x8c, 0x3d, 0xe4, 0x3a, 0x5e, 0xa5, 0x28, 0x24, 0x7d, 0x06, 0x20, 0x83, 0x10, 0x05, 0xf2, 0x00, 0x85, 0x4d, 0xba, 0x86,
0x69, 0x2a, 0x2d, 0xb5, 0x73, 0xbb, 0xfd, 0xdb, 0xb4, 0x3d, 0xf2, 0x7c, 0x1c, 0x66, 0xe8, 0x49, 0x6b, 0xf6, 0xef, 0xf5, 0xfe, 0x98, 0xf6, 0x26, 0x41, 0x88, 0x63, 0xc5, 0x9e, 0x54, 0x6e, 0x7e,
0xe5, 0xea, 0x47, 0x63, 0x8d, 0x49, 0x7c, 0x7a, 0x0c, 0x35, 0x1f, 0x85, 0xed, 0xda, 0xc2, 0xd6, 0x1e, 0x6e, 0x31, 0x4d, 0x4f, 0x6d, 0xa8, 0x89, 0x55, 0x38, 0x8d, 0x97, 0xc2, 0xae, 0x74, 0x89,
0x95, 0x4c, 0xbb, 0x2f, 0x6b, 0x4d, 0x14, 0xb1, 0xe7, 0x98, 0x05, 0xa3, 0xd0, 0x2f, 0x14, 0x67, 0xbb, 0xcb, 0xca, 0x94, 0xb6, 0xa1, 0x5e, 0x84, 0x7d, 0xbb, 0xda, 0x35, 0xdc, 0x5d, 0xb6, 0xc9,
0x95, 0xda, 0xba, 0xa6, 0x18, 0x9f, 0x08, 0xec, 0x9a, 0x5e, 0xe0, 0xf9, 0xde, 0x47, 0x74, 0x97, 0xcf, 0x2b, 0xf5, 0x6d, 0xcb, 0x38, 0xaf, 0xd4, 0x0d, 0xab, 0xe2, 0x7c, 0x27, 0x60, 0x32, 0xf4,
0xb2, 0xf5, 0x56, 0x64, 0x6b, 0x2c, 0xf9, 0x97, 0xb2, 0x7f, 0x86, 0xd4, 0xa1, 0xca, 0x67, 0xfe, 0xe7, 0x65, 0x3f, 0x47, 0x50, 0xbb, 0x4a, 0xf5, 0x66, 0xee, 0xea, 0xcd, 0xbc, 0x4b, 0x91, 0xaf,
0x38, 0x9c, 0x72, 0xbd, 0xd2, 0x24, 0xad, 0x1b, 0xac, 0xdc, 0xe6, 0x01, 0xce, 0x2a, 0x35, 0x45, 0x58, 0xa9, 0xa0, 0x9f, 0xa1, 0xe5, 0xcf, 0x66, 0x98, 0x48, 0x9c, 0x7b, 0x1c, 0x45, 0x12, 0x47,
0xab, 0x18, 0x9f, 0x09, 0xe8, 0x2b, 0x63, 0x0c, 0x30, 0xa0, 0xfd, 0x15, 0x49, 0x8c, 0x6b, 0x92, 0x02, 0x3d, 0xb5, 0x04, 0x7b, 0xbb, 0x6b, 0xb8, 0xcd, 0xfe, 0x03, 0xbd, 0x58, 0xfb, 0x4c, 0x8f,
0x0c, 0x30, 0xb8, 0x3e, 0x4c, 0x7d, 0x75, 0x98, 0x6f, 0x04, 0x54, 0x86, 0xb6, 0x5b, 0x76, 0xe2, 0x15, 0xea, 0xc9, 0x2a, 0x41, 0x76, 0x50, 0x9a, 0xe8, 0xa8, 0x70, 0x8e, 0xa1, 0xa1, 0x03, 0xd4,
0x10, 0xaa, 0x17, 0x89, 0x7c, 0xf8, 0x2d, 0xf9, 0xf0, 0x57, 0x09, 0xc6, 0x33, 0x56, 0x32, 0xe8, 0x84, 0xda, 0x78, 0x30, 0x7a, 0x7b, 0x31, 0x1c, 0x5b, 0x5b, 0xb4, 0x05, 0x7b, 0xe3, 0x09, 0x1b,
0x7b, 0xd8, 0xb3, 0x1d, 0x07, 0x23, 0x81, 0xae, 0x15, 0x23, 0x8f, 0xc2, 0x80, 0xa3, 0x95, 0x8d, 0x0e, 0x46, 0xc3, 0x17, 0xde, 0xc7, 0x37, 0xcc, 0x3b, 0x3d, 0x7b, 0xff, 0xfa, 0xd5, 0xd8, 0x22,
0x86, 0xbe, 0xde, 0x54, 0x5a, 0x5b, 0x9d, 0xbb, 0xb2, 0x58, 0x3a, 0xa6, 0xcd, 0x0a, 0xf6, 0x68, 0xce, 0x20, 0xab, 0xf2, 0x37, 0x56, 0xf4, 0x31, 0xd4, 0x38, 0x8a, 0x74, 0x29, 0xcb, 0x81, 0x5a,
0x16, 0x21, 0xdb, 0x2d, 0x4d, 0xe4, 0x2a, 0x37, 0x8e, 0xa0, 0x2e, 0x17, 0xa8, 0x0a, 0xd5, 0x61, 0xff, 0x0e, 0xa4, 0x78, 0x56, 0xea, 0x9c, 0x6f, 0x04, 0xaa, 0x8a, 0xa0, 0x0f, 0x81, 0x0a, 0xe9,
0xd7, 0x7c, 0x39, 0xe8, 0x0d, 0xb5, 0x35, 0xba, 0x07, 0xdb, 0xc3, 0x11, 0xeb, 0x75, 0xcd, 0xde, 0x73, 0xe9, 0xa9, 0x9d, 0x4b, 0x3f, 0x4c, 0xbc, 0x30, 0xf3, 0x21, 0xae, 0xc1, 0x2c, 0xc5, 0x4c,
0x13, 0xeb, 0xed, 0x0b, 0x66, 0x9d, 0xf6, 0x5f, 0x3f, 0x7f, 0x36, 0xd4, 0x88, 0xd1, 0x4d, 0x55, 0x4a, 0x62, 0x24, 0xa8, 0x0b, 0x16, 0x46, 0xf3, 0xdb, 0xda, 0x6d, 0xa5, 0x6d, 0x62, 0x34, 0xd7,
0xf6, 0xc2, 0x8a, 0x3e, 0x80, 0x6a, 0x8c, 0x3c, 0x99, 0x8a, 0xf2, 0x42, 0x7b, 0x7f, 0x5f, 0x28, 0x95, 0xc7, 0x50, 0x0f, 0x7d, 0x39, 0x5b, 0x20, 0x17, 0xb6, 0xa1, 0xba, 0xb2, 0xf5, 0xae, 0x2e,
0xc3, 0x59, 0xc9, 0x33, 0xbe, 0x12, 0xd8, 0xc8, 0x00, 0x7a, 0x0f, 0x28, 0x17, 0x76, 0x2c, 0xac, 0xfc, 0x29, 0x2e, 0x47, 0xb9, 0x80, 0x6d, 0x94, 0xf4, 0x08, 0xaa, 0x8b, 0x20, 0x92, 0xf9, 0x5b,
0xac, 0xaf, 0xc2, 0xf6, 0x23, 0xcb, 0x4f, 0x7d, 0x48, 0x4b, 0x61, 0x5a, 0x86, 0x8c, 0x4a, 0xc0, 0x9b, 0xfd, 0x83, 0xbf, 0x97, 0x7b, 0x96, 0x91, 0x2c, 0xd7, 0x38, 0x43, 0x30, 0xb5, 0xe1, 0xe8,
0xe4, 0xb4, 0x05, 0x1a, 0x06, 0xee, 0x32, 0x77, 0x3d, 0xe3, 0x6e, 0x61, 0xe0, 0xca, 0xcc, 0x23, 0xd3, 0xff, 0xbf, 0x33, 0xfd, 0xc2, 0x9c, 0x6b, 0xd8, 0x3b, 0x5d, 0xa4, 0xd1, 0x97, 0xec, 0x71,
0xa8, 0xf9, 0xb6, 0x70, 0x26, 0x18, 0xf3, 0x62, 0x9a, 0x75, 0x39, 0xd5, 0xc0, 0x1e, 0xe3, 0xd4, 0xb4, 0xad, 0x3e, 0x87, 0xe6, 0x2c, 0x87, 0xbd, 0x5b, 0x96, 0xf7, 0x75, 0xcb, 0xa2, 0xb0, 0x70,
0xcc, 0x09, 0x6c, 0xc1, 0xa4, 0x87, 0xb0, 0x31, 0xf1, 0x02, 0x91, 0xbf, 0xa7, 0xda, 0xd9, 0xfd, 0xbd, 0x33, 0xd3, 0x53, 0x7a, 0x08, 0x66, 0x76, 0x46, 0x2b, 0x2f, 0x88, 0xe6, 0x78, 0x5d, 0xec,
0xb3, 0xb9, 0xfd, 0x14, 0x64, 0x39, 0xc7, 0xe8, 0x81, 0x2a, 0x5d, 0x8e, 0x3e, 0xfa, 0xff, 0xaf, 0x09, 0x14, 0xf4, 0x32, 0x43, 0x4e, 0xf6, 0x6f, 0xd6, 0x1d, 0xf2, 0x63, 0xdd, 0x21, 0xbf, 0xd6,
0x4f, 0x9e, 0x22, 0xe3, 0x12, 0xb6, 0x4f, 0x27, 0x49, 0xf0, 0x21, 0x7d, 0x1c, 0xa9, 0xab, 0x8f, 0x1d, 0xf2, 0x69, 0x27, 0xf3, 0x4d, 0xa6, 0xd3, 0x1d, 0xf5, 0x1f, 0x3d, 0xf9, 0x1d, 0x00, 0x00,
0x61, 0xcb, 0xc9, 0xcb, 0xd6, 0x92, 0xe5, 0x1d, 0xd9, 0xb2, 0x10, 0x16, 0xae, 0x37, 0x1d, 0x79, 0xff, 0xff, 0x9b, 0x9a, 0xff, 0xe0, 0x86, 0x03, 0x00, 0x00,
0x4b, 0x1b, 0xa0, 0xa6, 0x63, 0x34, 0xb3, 0xbc, 0xc0, 0xc5, 0xcb, 0xa2, 0x4f, 0x90, 0x95, 0x9e,
0xa6, 0x95, 0x93, 0x9d, 0xab, 0xf9, 0x01, 0xf9, 0x3e, 0x3f, 0x20, 0x3f, 0xe7, 0x07, 0xe4, 0xdd,
0x66, 0xea, 0x1b, 0x8d, 0xc7, 0x9b, 0xd9, 0xdf, 0xe5, 0xe1, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff,
0xf1, 0x65, 0x72, 0x0c, 0x9c, 0x04, 0x00, 0x00,
} }
func (m *WriteRequest) Marshal() (dAtA []byte, err error) { func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
@ -599,109 +489,15 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized) i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized)
} }
if len(m.Metadata) > 0 { if len(m.Symbols2) > 0 {
for iNdEx := len(m.Metadata) - 1; iNdEx >= 0; iNdEx-- { for iNdEx := len(m.Symbols2) - 1; iNdEx >= 0; iNdEx-- {
{ i -= len(m.Symbols2[iNdEx])
size, err := m.Metadata[iNdEx].MarshalToSizedBuffer(dAtA[:i]) copy(dAtA[i:], m.Symbols2[iNdEx])
if err != nil { i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols2[iNdEx])))
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i-- i--
dAtA[i] = 0x1a dAtA[i] = 0x2a
} }
} }
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestLen) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestLen) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestLen) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 { if len(m.Symbols) > 0 {
i -= len(m.Symbols) i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols) copy(dAtA[i:], m.Symbols)
@ -988,30 +784,6 @@ func encodeVarintRemote(dAtA []byte, offset int, v uint64) int {
return base return base
} }
func (m *WriteRequest) Size() (n int) { func (m *WriteRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if len(m.Metadata) > 0 {
for _, e := range m.Metadata {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *MinimizedWriteRequest) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
} }
@ -1027,28 +799,12 @@ func (m *MinimizedWriteRequest) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovRemote(uint64(l)) n += 1 + l + sovRemote(uint64(l))
} }
if m.XXX_unrecognized != nil { if len(m.Symbols2) > 0 {
n += len(m.XXX_unrecognized) for _, s := range m.Symbols2 {
} l = len(s)
return n
}
func (m *MinimizedWriteRequestLen) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l)) n += 1 + l + sovRemote(uint64(l))
} }
} }
l = len(m.Symbols)
if l > 0 {
n += 1 + l + sovRemote(uint64(l))
}
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -1234,125 +990,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Metadata = append(m.Metadata, MetricMetadata{})
if err := m.Metadata[len(m.Metadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeries{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4: case 4:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
@ -1385,62 +1022,11 @@ func (m *MinimizedWriteRequest) Unmarshal(dAtA []byte) error {
} }
m.Symbols = string(dAtA[iNdEx:postIndex]) m.Symbols = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
default: case 5:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequestLen) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestLen: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestLen: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Symbols2", wireType)
} }
var msglen int var stringLen uint64
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
if shift >= 64 { if shift >= 64 {
return ErrIntOverflowRemote return ErrIntOverflowRemote
@ -1450,59 +1036,23 @@ func (m *MinimizedWriteRequestLen) Unmarshal(dAtA []byte) error {
} }
b := dAtA[iNdEx] b := dAtA[iNdEx]
iNdEx++ iNdEx++
msglen |= int(b&0x7F) << shift stringLen |= uint64(b&0x7F) << shift
if b < 0x80 { if b < 0x80 {
break break
} }
} }
if msglen < 0 { intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRemote return ErrInvalidLengthRemote
} }
postIndex := iNdEx + msglen postIndex := iNdEx + intStringLen
if postIndex < 0 { if postIndex < 0 {
return ErrInvalidLengthRemote return ErrInvalidLengthRemote
} }
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesLen{}) m.Symbols2 = append(m.Symbols2, string(dAtA[iNdEx:postIndex]))
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols[:0], dAtA[iNdEx:postIndex]...)
if m.Symbols == nil {
m.Symbols = []byte{}
}
iNdEx = postIndex iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex

View file

@ -20,35 +20,14 @@ import "types.proto";
import "gogoproto/gogo.proto"; import "gogoproto/gogo.proto";
message WriteRequest { message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; repeated TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2; reserved 2;
repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
}
message MinimizedWriteRequest {
repeated MinimizedTimeSeries timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
// Metadata (3) has moved to be part of the TimeSeries type
reserved 3; reserved 3;
// The symbols table. All symbols are concatenated strings. To read the symbols table, it's required // The symbols table. All symbols are concatenated strings. To read the symbols table, it's required
// to know the offset:length range of the actual symbol to read from this string. // to know the offset:length range of the actual symbol to read from this string.
string symbols = 4; string symbols = 4;
} repeated string symbols2 = 5; // TEST.
message MinimizedWriteRequestLen {
repeated MinimizedTimeSeriesLen timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
// Metadata (3) has moved to be part of the TimeSeries type
reserved 3;
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
bytes symbols = 4;
} }
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,7 @@ option go_package = "prompb";
import "gogoproto/gogo.proto"; import "gogoproto/gogo.proto";
message MetricMetadata { message Metadata {
enum MetricType { enum MetricType {
UNKNOWN = 0; UNKNOWN = 0;
COUNTER = 1; COUNTER = 1;
@ -33,9 +33,8 @@ message MetricMetadata {
// Represents the metric type, these match the set from Prometheus. // Represents the metric type, these match the set from Prometheus.
// Refer to model/textparse/interface.go for details. // Refer to model/textparse/interface.go for details.
MetricType type = 1; MetricType type = 1;
string metric_family_name = 2; string help = 2; // TODO(bwplotka): Move to symbols?
string help = 4; string unit = 3; // TODO(bwplotka): Move to symbols?
string unit = 5;
} }
message Sample { message Sample {
@ -47,14 +46,13 @@ message Sample {
message Exemplar { message Exemplar {
// Optional, can be empty. // Optional, can be empty.
repeated Label labels = 1 [(gogoproto.nullable) = false]; repeated Label labels = 1 [(gogoproto.nullable) = false]; // TODO(bwplotka): Move to symbols?
double value = 2; double value = 2;
// timestamp is in ms format, see model/timestamp/timestamp.go for // timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp. // conversion from time.Time to Prometheus timestamp.
int64 timestamp = 3; int64 timestamp = 3;
} }
// A native histogram, also known as a sparse histogram. // A native histogram, also known as a sparse histogram.
// Original design doc: // Original design doc:
// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit // https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
@ -123,40 +121,26 @@ message BucketSpan {
// TimeSeries represents samples and labels for a single time series. // TimeSeries represents samples and labels for a single time series.
message TimeSeries { message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars // For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required. // to be ingested by the remote system properly, the labels or label symbols field is required.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false]; // Deprecated in 1.1, use interned label symbols instead.
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
}
// based on an experiment by marco
message MinimizedTimeSeries {
// Sorted list of label name-value pair references. This list's len is always multiple of 4, // Sorted list of label name-value pair references. This list's len is always multiple of 4,
// packing tuples of (label name offset, label name length, label value offset, label value length). // packing tuples of (label name offset, label name length, label value offset, label value length).
// Offsets point to the symbol table in the higher level MinimizedWriteRequestLen. // Offsets point to the WriteRequest.symbols.
repeated uint32 label_symbols = 1 [(gogoproto.nullable) = false]; repeated uint32 label_symbols = 10;
// Sorted by time, oldest sample first. // Sorted by time, oldest sample first.
// TODO: support references for other types
repeated Sample samples = 2 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata Metadata metadata = 5;
} // Optional created timestamp for the metric in ms format,
// if the first sample in samples does not contain 0 value.
message MinimizedTimeSeriesLen { // See model/timestamp/timestamp.go for conversion from time.Time
// Sorted list of label name-value pair references, encoded as 32bit uint. This // to Prometheus timestamp.
// list's real len is always multiple of 2, label name offset/label value offset. int64 created_timestamp = 6;
// Offsets point to the symbol table in the higher level MinimizedWriteRequestLen.
repeated fixed32 label_symbols = 1;
// Sorted by time, oldest sample first.
// TODO: support references for other types
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
} }
message Label { message Label {

View file

@ -43,11 +43,11 @@ for dir in ${DIRS}; do
protoc --gogofast_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,paths=source_relative:. -I=. \ protoc --gogofast_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,paths=source_relative:. -I=. \
-I="${GOGOPROTO_PATH}" \ -I="${GOGOPROTO_PATH}" \
./io/prometheus/client/*.proto ./io/prometheus/client/*.proto
sed -i.bak -E 's/import _ \"github.com\/gogo\/protobuf\/gogoproto\"//g' -- *.pb.go gsed -i.bak -E 's/import _ \"github.com\/gogo\/protobuf\/gogoproto\"//g' -- *.pb.go
sed -i.bak -E 's/import _ \"google\/protobuf\"//g' -- *.pb.go gsed -i.bak -E 's/import _ \"google\/protobuf\"//g' -- *.pb.go
sed -i.bak -E 's/\t_ \"google\/protobuf\"//g' -- *.pb.go gsed -i.bak -E 's/\t_ \"google\/protobuf\"//g' -- *.pb.go
sed -i.bak -E 's/golang\/protobuf\/descriptor/gogo\/protobuf\/protoc-gen-gogo\/descriptor/g' -- *.go gsed -i.bak -E 's/golang\/protobuf\/descriptor/gogo\/protobuf\/protoc-gen-gogo\/descriptor/g' -- *.go
sed -i.bak -E 's/golang\/protobuf/gogo\/protobuf/g' -- *.go gsed -i.bak -E 's/golang\/protobuf/gogo\/protobuf/g' -- *.go
rm -f -- *.bak rm -f -- *.bak
goimports -w ./*.go ./io/prometheus/client/*.go goimports -w ./*.go ./io/prometheus/client/*.go
popd popd

View file

@ -15,7 +15,6 @@ package remote
import ( import (
"compress/gzip" "compress/gzip"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -24,7 +23,6 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"unsafe"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/golang/snappy" "github.com/golang/snappy"
@ -787,44 +785,17 @@ func labelsToUint32Slice(lbls labels.Labels, symbolTable *rwSymbolTable, buf []u
return result return result
} }
func labelsToUint32SliceLen(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 { func labelsToUint32Slice2(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0] result := buf[:0]
lbls.Range(func(l labels.Label) { lbls.Range(func(l labels.Label) {
off := symbolTable.RefLen(l.Name) off := symbolTable.Ref2(l.Name)
result = append(result, off) result = append(result, off)
off = symbolTable.RefLen(l.Value) off = symbolTable.Ref2(l.Value)
result = append(result, off) result = append(result, off)
}) })
return result return result
} }
func Uint32LenRefToLabels(symbols []byte, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
labelIdx := 0
for labelIdx < len(minLabels) {
// todo, check for overflow?
offset := minLabels[labelIdx]
labelIdx++
length, n := binary.Uvarint(symbols[offset:])
offset += uint32(n)
name := symbols[offset : uint64(offset)+length]
offset = minLabels[labelIdx]
labelIdx++
length, n = binary.Uvarint(symbols[offset:])
offset += uint32(n)
value := symbols[offset : uint64(offset)+length]
ls.Add(yoloString(name), yoloString(value))
}
return ls.Labels()
}
func yoloString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func Uint32RefToLabels(symbols string, minLabels []uint32) labels.Labels { func Uint32RefToLabels(symbols string, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2) ls := labels.NewScratchBuilder(len(minLabels) / 2)
@ -848,15 +819,32 @@ func Uint32RefToLabels(symbols string, minLabels []uint32) labels.Labels {
return ls.Labels() return ls.Labels()
} }
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. func Uint32RefToLabels2(symbols []string, minLabels []uint32) labels.Labels {
func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType { ls := labels.NewScratchBuilder(len(minLabels))
mt := strings.ToUpper(string(t))
v, ok := prompb.MetricMetadata_MetricType_value[mt] labelIdx := 0
if !ok { for labelIdx < len(minLabels) {
return prompb.MetricMetadata_UNKNOWN // todo, check for overflow?
name := symbols[labelIdx]
labelIdx++
// todo, check for overflow?
value := symbols[labelIdx]
labelIdx++
ls.Add(name, value)
} }
return prompb.MetricMetadata_MetricType(v) return ls.Labels()
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.Metadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.Metadata_MetricType_value[mt]
if !ok {
return prompb.Metadata_UNKNOWN
}
return prompb.Metadata_MetricType(v)
} }
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
@ -933,71 +921,30 @@ func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
return otlpReq, nil return otlpReq, nil
} }
// DecodeMinimizedWriteRequest from an io.Reader into a prompb.WriteRequest, handling // SymbolizeLabels symbolizes all fields that use label symbols as labels (in place).
// snappy decompression. func SymbolizeLabels(req *prompb.WriteRequest) (*prompb.WriteRequest, error) {
func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) { if len(req.GetSymbols()) > 0 {
compressed, err := io.ReadAll(r) for i, s := range req.Timeseries {
if err != nil { Uint32RefToLabels(req.Symbols, s.LabelSymbols).Range(func(l labels.Label) {
return nil, err req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
} Name: l.Name,
Value: l.Value,
reqBuf, err := snappy.Decode(nil, compressed) })
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestLen
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
// TODO handle metadata?
}
for i, rts := range redReq.Timeseries {
Uint32RefToLabels(redReq.Symbols, rts.LabelSymbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
}) })
})
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp
exemplars[j].Labels = e.Labels
} }
return req, nil
req.Timeseries[i].Samples = rts.Samples
req.Timeseries[i].Exemplars = exemplars
req.Timeseries[i].Histograms = rts.Histograms
} }
return req, nil if len(req.GetSymbols2()) > 0 {
for i, s := range req.Timeseries {
Uint32RefToLabels2(req.Symbols2, s.LabelSymbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
})
}
return req, nil
}
return nil, errors.New("no symbols detected")
} }

View file

@ -75,7 +75,7 @@ var writeRequestFixture = &prompb.WriteRequest{
} }
// writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation. // writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation.
var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest { var writeRequestMinimizedFixture = func() *prompb.WriteRequest {
st := newRwSymbolTable() st := newRwSymbolTable()
var labels []uint32 var labels []uint32
for _, s := range []string{ for _, s := range []string{
@ -88,8 +88,41 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest {
off, length := st.Ref(s) off, length := st.Ref(s)
labels = append(labels, off, length) labels = append(labels, off, length)
} }
return &prompb.MinimizedWriteRequest{ return &prompb.WriteRequest{
Timeseries: []prompb.MinimizedTimeSeries{ Timeseries: []prompb.TimeSeries{
{
LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
},
{
LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
},
},
Symbols: st.LabelsString(),
}
}()
// writeRequestMinimizedFixture2 represents the same request as writeRequestFixture, but using the minimized representation and array symbol.
var writeRequestMinimizedFixture2 = func() *prompb.WriteRequest {
st := newRwSymbolTable()
var labels []uint32
for _, s := range []string{
"__name__", "test_metric1",
"b", "c",
"baz", "qux",
"d", "e",
"foo", "bar",
} {
off, _ := st.Ref(s)
labels = append(labels, off)
}
return &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{ {
LabelSymbols: labels, LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
@ -522,22 +555,22 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
tc := []struct { tc := []struct {
desc string desc string
input textparse.MetricType input textparse.MetricType
expected prompb.MetricMetadata_MetricType expected prompb.Metadata_MetricType
}{ }{
{ {
desc: "with a single-word metric", desc: "with a single-word metric",
input: textparse.MetricTypeCounter, input: textparse.MetricTypeCounter,
expected: prompb.MetricMetadata_COUNTER, expected: prompb.Metadata_COUNTER,
}, },
{ {
desc: "with a two-word metric", desc: "with a two-word metric",
input: textparse.MetricTypeStateset, input: textparse.MetricTypeStateset,
expected: prompb.MetricMetadata_STATESET, expected: prompb.Metadata_STATESET,
}, },
{ {
desc: "with an unknown metric", desc: "with an unknown metric",
input: "not-known", input: "not-known",
expected: prompb.MetricMetadata_UNKNOWN, expected: prompb.Metadata_UNKNOWN,
}, },
} }
@ -550,7 +583,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
} }
func TestDecodeWriteRequest(t *testing.T) { func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
require.NoError(t, err) require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf)) actual, err := DecodeWriteRequest(bytes.NewReader(buf))
@ -563,7 +596,17 @@ func TestDecodeMinWriteRequest(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
actual, err := DecodeMinimizedWriteRequest(bytes.NewReader(buf)) actual, err := DecodeWriteRequest(bytes.NewReader(buf))
require.NoError(t, err)
require.Equal(t, writeRequestMinimizedFixture, actual)
}
func TestDecodeMinWriteRequest2(t *testing.T) {
buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, writeRequestMinimizedFixture, actual) require.Equal(t, writeRequestMinimizedFixture, actual)
} }
@ -887,10 +930,18 @@ func (c *mockChunkIterator) Err() error {
return nil return nil
} }
func TestLenFormat(t *testing.T) { func TestFormat1(t *testing.T) {
r := rwSymbolTable{} r := newRwSymbolTable()
ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234") ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234")
encoded := labelsToUint32SliceLen(ls, &r, nil) encoded := labelsToUint32Slice(ls, &r, nil)
decoded := Uint32LenRefToLabels(r.LabelsData(), encoded) decoded := Uint32RefToLabels(r.LabelsString(), encoded)
require.Equal(t, ls, decoded)
}
func TestFormat2(t *testing.T) {
r := newRwSymbolTable()
ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234")
encoded := labelsToUint32Slice2(ls, &r, nil)
decoded := Uint32RefToLabels2(r.LabelsStrings(), encoded)
require.Equal(t, ls, decoded) require.Equal(t, ls, decoded)
} }

View file

@ -15,7 +15,6 @@ package remote
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"math" "math"
"strconv" "strconv"
@ -393,9 +392,9 @@ type WriteClient interface {
type RemoteWriteFormat int64 type RemoteWriteFormat int64
const ( const (
Base1 RemoteWriteFormat = iota // original map based format Base1 RemoteWriteFormat = iota // original map based format
Min32Optimized // two 32bit varint plus marshalling optimization Min32Optimized // two 32bit varint plus marshalling optimization
MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice) Min32OptimizedArray // 1 32bit varint with array plus marshalling optimization
) )
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
@ -517,13 +516,13 @@ func NewQueueManager(
// AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized. // AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized.
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) {
mm := make([]prompb.MetricMetadata, 0, len(metadata)) mm := make([]prompb.Metadata, 0, len(metadata))
for _, entry := range metadata { for _, entry := range metadata {
mm = append(mm, prompb.MetricMetadata{ mm = append(mm, prompb.Metadata{
MetricFamilyName: entry.Metric, //MetricFamilyName: entry.Metric,
Help: entry.Help, Help: entry.Help,
Type: metricTypeToMetricTypeProto(entry.Type), Type: metricTypeToMetricTypeProto(entry.Type),
Unit: entry.Unit, Unit: entry.Unit,
}) })
} }
@ -542,9 +541,10 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
} }
} }
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.Metadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples. // Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) // TODO(bwplotka): Metadata support.
req, _, err := buildWriteRequest(nil, pBuf, nil)
if err != nil { if err != nil {
return err return err
} }
@ -1371,7 +1371,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
max += int(float64(max) * 0.1) max += int(float64(max) * 0.1)
} }
// TODO we should make an interface for the timeseries type
batchQueue := queue.Chan() batchQueue := queue.Chan()
pendingData := make([]prompb.TimeSeries, max) pendingData := make([]prompb.TimeSeries, max)
for i := range pendingData { for i := range pendingData {
@ -1381,16 +1380,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
pendingMinimizedData := make([]prompb.MinimizedTimeSeries, max)
for i := range pendingMinimizedData {
pendingMinimizedData[i].Samples = []prompb.Sample{{}}
}
pendingMinLenData := make([]prompb.MinimizedTimeSeriesLen, max)
for i := range pendingMinLenData {
pendingMinLenData[i].Samples = []prompb.Sample{{}}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() { stop := func() {
if !timer.Stop() { if !timer.Stop() {
@ -1431,14 +1420,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
case Min32Optimized: case Min32Optimized:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendMinSamples(ctx, pendingData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
case MinLen: case Min32OptimizedArray:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries2(&symbolTable, batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendMinSamples2(ctx, pendingData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
} }
@ -1458,11 +1447,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
case Min32Optimized: case Min32Optimized:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendMinSamples(ctx, pendingData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
} }
} }
@ -1517,14 +1506,14 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
req, highest, err := buildWriteRequest(samples, nil, pBuf, buf) req, highest, err := buildWriteRequest(samples, pBuf, buf)
if err == nil { if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
} }
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) { func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.TimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the ReducedWriteRequest with no metadata. // Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
@ -1536,12 +1525,12 @@ func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedT
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { func (s *shards) sendMinSamples2(ctx context.Context, samples []prompb.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the ReducedWriteRequest with no metadata. // Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, buf) req, highest, err := buildMinimizedWriteRequest2(samples, labels, pBuf, buf)
if err == nil { if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
} }
@ -1630,7 +1619,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp
return err return err
} }
func populateMinimizedTimeSeries(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { func populateMinimizedTimeSeries(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1672,7 +1661,7 @@ func populateMinimizedTimeSeries(symbolTable *rwSymbolTable, batch []timeSeries,
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func populateMinimizedTimeSeriesLen(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesLen, sendExemplars, sendNativeHistograms bool) (int, int, int) { func populateMinimizedTimeSeries2(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1688,7 +1677,7 @@ func populateMinimizedTimeSeriesLen(symbolTable *rwSymbolTable, batch []timeSeri
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceLen(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols) pendingData[nPending].LabelSymbols = labelsToUint32Slice2(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
@ -1760,7 +1749,7 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
} }
} }
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { func buildWriteRequest(samples []prompb.TimeSeries, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1777,7 +1766,6 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{
Timeseries: samples, Timeseries: samples,
Metadata: metadata,
} }
if pBuf == nil { if pBuf == nil {
@ -1812,19 +1800,16 @@ type offLenPair struct {
} }
type rwSymbolTable struct { type rwSymbolTable struct {
symbols []byte symbols []byte
symbolsMap map[string]offLenPair symbolsMap map[string]offLenPair
symbolsMap64Packed map[string]uint64 symbols2 []string
symbolsMap32Packed map[string]uint32 symbols2MapID map[string]uint32
symbolsMapBytes map[string]uint32
} }
func newRwSymbolTable() rwSymbolTable { func newRwSymbolTable() rwSymbolTable {
return rwSymbolTable{ return rwSymbolTable{
symbolsMap: make(map[string]offLenPair), symbolsMap: make(map[string]offLenPair),
symbolsMap64Packed: make(map[string]uint64), symbols2MapID: make(map[string]uint32),
symbolsMap32Packed: make(map[string]uint32),
symbolsMapBytes: make(map[string]uint32),
} }
} }
@ -1837,21 +1822,20 @@ func (r *rwSymbolTable) Ref(str string) (uint32, uint32) {
panic(1) panic(1)
} }
r.symbols = append(r.symbols, str...) r.symbols = append(r.symbols, str...)
if len(r.symbols) < int(off+length) { if len(r.symbols) < int(off) {
panic(2) panic(2)
} }
r.symbolsMap[str] = offLenPair{off, length} r.symbolsMap[str] = offLenPair{off, length}
return off, length return off, length
} }
func (r *rwSymbolTable) RefLen(str string) uint32 { func (r *rwSymbolTable) Ref2(str string) uint32 {
if ref, ok := r.symbolsMapBytes[str]; ok { if ref, ok := r.symbols2MapID[str]; ok {
return ref return ref
} }
ref := uint32(len(r.symbols)) ref := uint32(len(r.symbols2))
r.symbols = binary.AppendUvarint(r.symbols, uint64(len(str))) r.symbols2 = append(r.symbols2, str)
r.symbols = append(r.symbols, str...) r.symbols2MapID[str] = ref
r.symbolsMapBytes[str] = ref
return ref return ref
} }
@ -1859,27 +1843,23 @@ func (r *rwSymbolTable) LabelsString() string {
return *((*string)(unsafe.Pointer(&r.symbols))) return *((*string)(unsafe.Pointer(&r.symbols)))
} }
func (r *rwSymbolTable) LabelsData() []byte { func (r *rwSymbolTable) LabelsStrings() []string {
return r.symbols return r.symbols2
} }
func (r *rwSymbolTable) clear() { func (r *rwSymbolTable) clear() {
for k := range r.symbolsMap { for k := range r.symbolsMap {
delete(r.symbolsMap, k) delete(r.symbolsMap, k)
} }
for k := range r.symbolsMap64Packed {
delete(r.symbolsMap64Packed, k)
}
for k := range r.symbolsMap32Packed {
delete(r.symbolsMap32Packed, k)
}
for k := range r.symbolsMapBytes {
delete(r.symbolsMapBytes, k)
}
r.symbols = r.symbols[:0] r.symbols = r.symbols[:0]
for k := range r.symbols2MapID {
delete(r.symbols2MapID, k)
}
r.symbols2 = r.symbols2[:0]
} }
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte) ([]byte, int64, error) { func buildMinimizedWriteRequest(samples []prompb.TimeSeries, labels string, pBuf, buf *[]byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1894,7 +1874,7 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
} }
} }
req := &prompb.MinimizedWriteRequest{ req := &prompb.WriteRequest{
Symbols: labels, Symbols: labels,
Timeseries: samples, Timeseries: samples,
} }
@ -1924,7 +1904,7 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str
return compressed, highest, nil return compressed, highest, nil
} }
func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { func buildMinimizedWriteRequest2(samples []prompb.TimeSeries, labels []string, pBuf, buf *[]byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1939,20 +1919,19 @@ func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labe
} }
} }
req := &prompb.MinimizedWriteRequestLen{ req := &prompb.WriteRequest{
Symbols: labels, Symbols2: labels,
Timeseries: samples, Timeseries: samples,
} }
if pBuf == nil { if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. pBuf = &[]byte{} // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
} }
err := pBuf.Marshal(req) data, err := req.OptimizedMarshal(*pBuf)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the // snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible. // buffer as long as possible.
@ -1962,8 +1941,8 @@ func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labe
buf = &[]byte{} buf = &[]byte{}
} }
compressed := snappy.Encode(*buf, pBuf.Bytes()) compressed := snappy.Encode(*buf, data)
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) { if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
// grow the buffer for the next time // grow the buffer for the next time
*buf = make([]byte, n) *buf = make([]byte, n)
} }

View file

@ -200,7 +200,7 @@ func TestMetadataDelivery(t *testing.T) {
// fit into MaxSamplesPerSend. // fit into MaxSamplesPerSend.
require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived) require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived)
// Make sure the last samples were sent. // Make sure the last samples were sent.
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) //require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
} }
func TestSampleDeliveryTimeout(t *testing.T) { func TestSampleDeliveryTimeout(t *testing.T) {
@ -694,7 +694,7 @@ type TestWriteClient struct {
receivedFloatHistograms map[string][]prompb.Histogram receivedFloatHistograms map[string][]prompb.Histogram
expectedHistograms map[string][]prompb.Histogram expectedHistograms map[string][]prompb.Histogram
expectedFloatHistograms map[string][]prompb.Histogram expectedFloatHistograms map[string][]prompb.Histogram
receivedMetadata map[string][]prompb.MetricMetadata receivedMetadata map[string][]prompb.Metadata
writesReceived int writesReceived int
withWaitGroup bool withWaitGroup bool
wg sync.WaitGroup wg sync.WaitGroup
@ -708,7 +708,7 @@ func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient {
withWaitGroup: true, withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{}, receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{}, receivedMetadata: map[string][]prompb.Metadata{},
rwFormat: rwFormat, rwFormat: rwFormat,
} }
} }
@ -825,22 +825,23 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
return err return err
} }
var reqProto *prompb.WriteRequest reqProto := &prompb.WriteRequest{}
switch c.rwFormat { err = proto.Unmarshal(reqBuf, reqProto)
case Base1: if err != nil {
reqProto = &prompb.WriteRequest{} return err
err = proto.Unmarshal(reqBuf, reqProto)
case Min32Optimized:
var reqMin prompb.MinimizedWriteRequest
err = proto.Unmarshal(reqBuf, &reqMin)
if err == nil {
reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin)
}
} }
if err != nil { switch c.rwFormat {
fmt.Println("error: ", err) case Min32Optimized:
return err reqProto, err = SymbolizeLabels(reqProto)
if err != nil {
return err
}
case Min32OptimizedArray:
reqProto, err = SymbolizeLabels(reqProto)
if err != nil {
return err
}
} }
count := 0 count := 0
@ -871,9 +872,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
c.wg.Add(-count) c.wg.Add(-count)
} }
for _, m := range reqProto.Metadata { // TODO(bwplotka): Adjust to new protocol
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) //for _, m := range reqProto.Metadata {
} // c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
//}
c.writesReceived++ c.writesReceived++
@ -1433,9 +1435,9 @@ func createDummyTimeSeries(instances int) []timeSeries {
b := labels.NewBuilder(commonLabels) b := labels.NewBuilder(commonLabels)
b.Set("pod", "prometheus-"+strconv.Itoa(i)) b.Set("pod", "prometheus-"+strconv.Itoa(i))
for _, lbls := range metrics { for _, lbls := range metrics {
for _, l := range lbls { lbls.Range(func(l labels.Label) {
b.Set(l.Name, l.Value) b.Set(l.Name, l.Value)
} })
result = append(result, timeSeries{ result = append(result, timeSeries{
seriesLabels: b.Labels(), seriesLabels: b.Labels(),
value: r.Float64(), value: r.Float64(),
@ -1458,14 +1460,14 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
// Warmup buffers // Warmup buffers
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
populateTimeSeries(batch, seriesBuff, true, true) populateTimeSeries(batch, seriesBuff, true, true)
buildWriteRequest(seriesBuff, nil, pBuf, &buff) buildWriteRequest(seriesBuff, pBuf, &buff)
} }
b.ResetTimer() b.ResetTimer()
totalSize := 0 totalSize := 0
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
populateTimeSeries(batch, seriesBuff, true, true) populateTimeSeries(batch, seriesBuff, true, true)
req, _, err := buildWriteRequest(seriesBuff, nil, pBuf, &buff) req, _, err := buildWriteRequest(seriesBuff, pBuf, &buff)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -1503,7 +1505,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
for _, tc := range testCases { for _, tc := range testCases {
symbolTable := newRwSymbolTable() symbolTable := newRwSymbolTable()
buff := make([]byte, 0) buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) seriesBuff := make([]prompb.TimeSeries, len(tc.batch))
for i := range seriesBuff { for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}} seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}} seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
@ -1512,16 +1514,16 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
// Warmup buffers // Warmup buffers
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) //populateMinimizedTimeSeries2(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) // buildMinimizedWriteRequest2(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff)
} }
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0 totalSize := 0
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) // populateMinimizedTimeSeries2(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer() b.ResetTimer()
req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) //buildMinimizedWriteRequest2(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View file

@ -43,11 +43,6 @@ type writeHandler struct {
appendable storage.Appendable appendable storage.Appendable
samplesWithInvalidLabelsTotal prometheus.Counter samplesWithInvalidLabelsTotal prometheus.Counter
// Experimental feature, new remote write proto format
// The handler will accept the new format, but it can still accept the old one
// TODO: this should eventually be via content negotiation
rwFormat RemoteWriteFormat
} }
// NewWriteHandler creates a http.Handler that accepts remote write requests and // NewWriteHandler creates a http.Handler that accepts remote write requests and
@ -56,7 +51,6 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
h := &writeHandler{ h := &writeHandler{
logger: logger, logger: logger,
appendable: appendable, appendable: appendable,
rwFormat: rwFormat,
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus", Namespace: "prometheus",
Subsystem: "api", Subsystem: "api",
@ -73,35 +67,14 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error var err error
var req *prompb.WriteRequest var req *prompb.WriteRequest
var reqMin *prompb.MinimizedWriteRequest req, err = DecodeWriteRequest(r.Body)
var reqMinLen *prompb.MinimizedWriteRequestLen
// TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat {
case Base1:
req, err = DecodeWriteRequest(r.Body)
case Min32Optimized:
reqMin, err = DecodeMinimizedWriteRequest(r.Body)
case MinLen:
reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body)
}
if err != nil { if err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
// TODO: this should eventually be done detecting the format version above err = h.write(r.Context(), req)
switch h.rwFormat {
case Base1:
err = h.write(r.Context(), req)
case Min32Optimized:
err = h.writeMin(r.Context(), reqMin)
case MinLen:
err = h.writeMinLen(r.Context(), reqMinLen)
}
switch err { switch err {
case nil: case nil:
case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp: case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp:
@ -148,6 +121,13 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
err = app.Commit() err = app.Commit()
}() }()
if len(req.GetSymbols()) > 0 || len(req.GetSymbols()) > 0 {
// 1.1 PRW
req, err = SymbolizeLabels(req)
if err != nil {
return err
}
}
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
ls := labelProtosToLabels(ts.Labels) ls := labelProtosToLabels(ts.Labels)
if !ls.IsValid() { if !ls.IsValid() {
@ -293,7 +273,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
func (h *writeHandler) writeMin(ctx context.Context, req *prompb.MinimizedWriteRequest) (err error) { func (h *writeHandler) writeMin(ctx context.Context, req *prompb.WriteRequest) (err error) {
outOfOrderExemplarErrs := 0 outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx) app := h.appendable.Appender(ctx)
@ -330,41 +310,3 @@ func (h *writeHandler) writeMin(ctx context.Context, req *prompb.MinimizedWriteR
return nil return nil
} }
func (h *writeHandler) writeMinLen(ctx context.Context, req *prompb.MinimizedWriteRequestLen) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32LenRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}

View file

@ -38,7 +38,7 @@ import (
) )
func TestRemoteWriteHandler(t *testing.T) { func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -138,7 +138,7 @@ func TestOutOfOrderSample(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil) }}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -164,7 +164,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil) }}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -188,7 +188,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
}}, nil, nil, nil) }}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -218,7 +218,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
{Name: "test_label_name_" + num, Value: labelValue + num}, {Name: "test_label_name_" + num, Value: labelValue + num},
}, },
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil) }}, nil, nil)
require.NoError(b, err) require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err) require.NoError(b, err)
@ -237,7 +237,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
} }
func TestCommitErr(t *testing.T) { func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -275,7 +275,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil)
require.NoError(b, err) require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -288,7 +288,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
var bufRequests [][]byte var bufRequests [][]byte
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil) buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil)
require.NoError(b, err) require.NoError(b, err)
bufRequests = append(bufRequests, buf) bufRequests = append(bufRequests, buf)
} }