remove all new rw formats but the []string one

also adapt tests to the new format

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-11-29 12:41:16 -03:00
parent 31d3956f47
commit ec9300fc1a
13 changed files with 204 additions and 2965 deletions

View file

@ -40,134 +40,6 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
return r.Marshal() return r.Marshal()
} }
func (m *MinimizedWriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
dst = make([]byte, siz)
}
dst = dst[:siz]
n, err := m.OptimizedMarshalToSizedBuffer(dst)
if err != nil {
return nil, err
}
return (dst)[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *MinimizedWriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelSymbols in place without extra allocations.
func (m *MinimizedTimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelSymbols) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelSymbols {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestStr) OptimizedMarshal(dst []byte) ([]byte, error) { func (m *MinimizedWriteRequestStr) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size() siz := m.Size()
if cap(dst) < siz { if cap(dst) < siz {

View file

@ -23,49 +23,58 @@ func TestOptimizedMarshal(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
m *MinimizedWriteRequest m *MinimizedWriteRequestStr
}{ }{
// { // {
// name: "empty", // name: "empty",
// m: &MinimizedWriteRequest{}, // m: &MinimizedWriteRequestStr{},
// }, // },
{ {
name: "simple", name: "simple",
m: &MinimizedWriteRequest{ m: &MinimizedWriteRequestStr{
Timeseries: []MinimizedTimeSeries{ Timeseries: []MinimizedTimeSeriesStr{
{ {
LabelSymbols: []uint32{ LabelSymbols: []uint32{
0, 10, 0, 1,
10, 3, 2, 3,
13, 3, 4, 5,
16, 6, 6, 7,
22, 3, 8, 9,
25, 5, 10, 11,
30, 3, 12, 13,
33, 7, 14, 15,
}, },
Samples: []Sample{{Value: 1, Timestamp: 0}}, Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, Exemplars: []Exemplar{{Labels: []Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: nil, Histograms: nil,
}, },
{ {
LabelSymbols: []uint32{ LabelSymbols: []uint32{
0, 10, 0, 1,
10, 3, 2, 3,
13, 3, 4, 5,
16, 6, 6, 7,
22, 3, 8, 9,
25, 5, 10, 11,
30, 3, 12, 13,
33, 7, 14, 15,
}, },
Samples: []Sample{{Value: 2, Timestamp: 1}}, Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, Exemplars: []Exemplar{{Labels: []Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: nil, Histograms: nil,
}, },
}, },
// 40 chars Symbols: []string{
Symbols: "abcdefghijabcdefghijabcdefghijabcdefghij", "a", "b",
"c", "d",
"e", "f",
"g", "h",
"i", "j",
"k", "l",
"m", "n",
"o", "p",
},
}, },
}, },
} }
@ -81,7 +90,7 @@ func TestOptimizedMarshal(t *testing.T) {
require.Equal(t, expected, got) require.Equal(t, expected, got)
// round trip // round trip
m := &MinimizedWriteRequest{} m := &MinimizedWriteRequestStr{}
require.NoError(t, m.Unmarshal(got)) require.NoError(t, m.Unmarshal(got))
require.Equal(t, tt.m, m) require.Equal(t, tt.m, m)
}) })

View file

@ -60,7 +60,7 @@ func (x ReadRequest_ResponseType) String() string {
} }
func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5, 0} return fileDescriptor_eefc82927d57d89b, []int{2, 0}
} }
type WriteRequest struct { type WriteRequest struct {
@ -118,120 +118,6 @@ func (m *WriteRequest) GetMetadata() []MetricMetadata {
return nil return nil
} }
type MinimizedWriteRequest struct {
Timeseries []MinimizedTimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings. To read the symbols table, it's required
// to know the offset:length range of the actual symbol to read from this string.
Symbols string `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequest) Reset() { *m = MinimizedWriteRequest{} }
func (m *MinimizedWriteRequest) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequest) ProtoMessage() {}
func (*MinimizedWriteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{1}
}
func (m *MinimizedWriteRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequest.Merge(m, src)
}
func (m *MinimizedWriteRequest) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequest) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequest.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequest proto.InternalMessageInfo
func (m *MinimizedWriteRequest) GetTimeseries() []MinimizedTimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequest) GetSymbols() string {
if m != nil {
return m.Symbols
}
return ""
}
type MinimizedWriteRequestLen struct {
Timeseries []MinimizedTimeSeriesLen `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
Symbols []byte `protobuf:"bytes,4,opt,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestLen) Reset() { *m = MinimizedWriteRequestLen{} }
func (m *MinimizedWriteRequestLen) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestLen) ProtoMessage() {}
func (*MinimizedWriteRequestLen) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{2}
}
func (m *MinimizedWriteRequestLen) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestLen) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestLen.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestLen) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestLen.Merge(m, src)
}
func (m *MinimizedWriteRequestLen) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestLen) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestLen.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestLen proto.InternalMessageInfo
func (m *MinimizedWriteRequestLen) GetTimeseries() []MinimizedTimeSeriesLen {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestLen) GetSymbols() []byte {
if m != nil {
return m.Symbols
}
return nil
}
type MinimizedWriteRequestStr struct { type MinimizedWriteRequestStr struct {
Timeseries []MinimizedTimeSeriesStr `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` Timeseries []MinimizedTimeSeriesStr `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Symbols []string `protobuf:"bytes,4,rep,name=symbols,proto3" json:"symbols,omitempty"` Symbols []string `protobuf:"bytes,4,rep,name=symbols,proto3" json:"symbols,omitempty"`
@ -244,7 +130,7 @@ func (m *MinimizedWriteRequestStr) Reset() { *m = MinimizedWriteRequestS
func (m *MinimizedWriteRequestStr) String() string { return proto.CompactTextString(m) } func (m *MinimizedWriteRequestStr) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestStr) ProtoMessage() {} func (*MinimizedWriteRequestStr) ProtoMessage() {}
func (*MinimizedWriteRequestStr) Descriptor() ([]byte, []int) { func (*MinimizedWriteRequestStr) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3} return fileDescriptor_eefc82927d57d89b, []int{1}
} }
func (m *MinimizedWriteRequestStr) XXX_Unmarshal(b []byte) error { func (m *MinimizedWriteRequestStr) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -287,61 +173,6 @@ func (m *MinimizedWriteRequestStr) GetSymbols() []string {
return nil return nil
} }
type MinimizedWriteRequestStrFixed struct {
Timeseries []MinimizedTimeSeriesStrFixed `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Symbols []string `protobuf:"bytes,4,rep,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestStrFixed) Reset() { *m = MinimizedWriteRequestStrFixed{} }
func (m *MinimizedWriteRequestStrFixed) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestStrFixed) ProtoMessage() {}
func (*MinimizedWriteRequestStrFixed) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4}
}
func (m *MinimizedWriteRequestStrFixed) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestStrFixed) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestStrFixed.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestStrFixed) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestStrFixed.Merge(m, src)
}
func (m *MinimizedWriteRequestStrFixed) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestStrFixed) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestStrFixed.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestStrFixed proto.InternalMessageInfo
func (m *MinimizedWriteRequestStrFixed) GetTimeseries() []MinimizedTimeSeriesStrFixed {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestStrFixed) GetSymbols() []string {
if m != nil {
return m.Symbols
}
return nil
}
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.
type ReadRequest struct { type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"`
@ -360,7 +191,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5} return fileDescriptor_eefc82927d57d89b, []int{2}
} }
func (m *ReadRequest) XXX_Unmarshal(b []byte) error { func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -416,7 +247,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{6} return fileDescriptor_eefc82927d57d89b, []int{3}
} }
func (m *ReadResponse) XXX_Unmarshal(b []byte) error { func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -466,7 +297,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{7} return fileDescriptor_eefc82927d57d89b, []int{4}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -535,7 +366,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) } func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {} func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) { func (*QueryResult) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{8} return fileDescriptor_eefc82927d57d89b, []int{5}
} }
func (m *QueryResult) XXX_Unmarshal(b []byte) error { func (m *QueryResult) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -588,7 +419,7 @@ func (m *ChunkedReadResponse) Reset() { *m = ChunkedReadResponse{} }
func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) } func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) }
func (*ChunkedReadResponse) ProtoMessage() {} func (*ChunkedReadResponse) ProtoMessage() {}
func (*ChunkedReadResponse) Descriptor() ([]byte, []int) { func (*ChunkedReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{9} return fileDescriptor_eefc82927d57d89b, []int{6}
} }
func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error { func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -634,10 +465,7 @@ func (m *ChunkedReadResponse) GetQueryIndex() int64 {
func init() { func init() {
proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value) proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value)
proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest")
proto.RegisterType((*MinimizedWriteRequest)(nil), "prometheus.MinimizedWriteRequest")
proto.RegisterType((*MinimizedWriteRequestLen)(nil), "prometheus.MinimizedWriteRequestLen")
proto.RegisterType((*MinimizedWriteRequestStr)(nil), "prometheus.MinimizedWriteRequestStr") proto.RegisterType((*MinimizedWriteRequestStr)(nil), "prometheus.MinimizedWriteRequestStr")
proto.RegisterType((*MinimizedWriteRequestStrFixed)(nil), "prometheus.MinimizedWriteRequestStrFixed")
proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse")
proto.RegisterType((*Query)(nil), "prometheus.Query") proto.RegisterType((*Query)(nil), "prometheus.Query")
@ -648,45 +476,42 @@ func init() {
func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) } func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) }
var fileDescriptor_eefc82927d57d89b = []byte{ var fileDescriptor_eefc82927d57d89b = []byte{
// 608 bytes of a gzipped FileDescriptorProto // 552 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x4d, 0x6f, 0xd3, 0x4e, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcb, 0x6e, 0xd3, 0x4c,
0x10, 0xc6, 0xbb, 0x75, 0xda, 0xe4, 0x3f, 0xce, 0xbf, 0x0a, 0xdb, 0x96, 0x9a, 0x4a, 0xb4, 0x95, 0x14, 0xae, 0xeb, 0xb4, 0xc9, 0x7f, 0xdc, 0xbf, 0x32, 0xd3, 0x96, 0x9a, 0x2c, 0xd2, 0xc8, 0x62,
0x85, 0x44, 0xa4, 0xa2, 0x20, 0x4a, 0xc5, 0xa9, 0x07, 0xda, 0x12, 0x54, 0x4a, 0xcc, 0xcb, 0x3a, 0x61, 0xa9, 0x28, 0x88, 0x50, 0xb1, 0xea, 0x82, 0xb4, 0x44, 0x0a, 0xa5, 0xe6, 0x32, 0x0e, 0x02,
0x08, 0x84, 0x90, 0x2c, 0x27, 0x1e, 0x35, 0x16, 0xf5, 0x4b, 0x77, 0xd7, 0x52, 0xc3, 0x99, 0x13, 0x21, 0x24, 0xcb, 0xb1, 0x8f, 0x1a, 0x8b, 0xf8, 0xd2, 0x99, 0xb1, 0xd4, 0xf0, 0x08, 0xac, 0x78,
0x27, 0x0e, 0x7c, 0x22, 0x4e, 0x3d, 0x21, 0x3e, 0x01, 0x42, 0xfd, 0x24, 0xc8, 0x6f, 0x61, 0x03, 0x26, 0x56, 0x5d, 0x21, 0x9e, 0x00, 0xa1, 0x3c, 0x09, 0xf2, 0x2d, 0x9d, 0x40, 0x17, 0xec, 0x3c,
0x29, 0x81, 0xde, 0xec, 0x99, 0xe7, 0x79, 0xf2, 0xdb, 0xc9, 0x78, 0xa1, 0xce, 0x31, 0x88, 0x24, 0xe7, 0xbb, 0x9c, 0x6f, 0xce, 0x1c, 0xc3, 0x16, 0xc3, 0x28, 0x11, 0xd8, 0x4b, 0x59, 0x22, 0x12,
0xb6, 0x62, 0x1e, 0xc9, 0x88, 0x42, 0xcc, 0xa3, 0x00, 0xe5, 0x00, 0x13, 0xb1, 0xaa, 0xcb, 0x61, 0x02, 0x29, 0x4b, 0x22, 0x14, 0x53, 0xcc, 0x78, 0x5b, 0x13, 0xf3, 0x14, 0x79, 0x09, 0xb4, 0x77,
0x8c, 0x22, 0x6f, 0xac, 0x2e, 0x1d, 0x45, 0x47, 0x51, 0xf6, 0x78, 0x3b, 0x7d, 0xca, 0xab, 0xe6, 0x2f, 0x92, 0x8b, 0xa4, 0xf8, 0x7c, 0x98, 0x7f, 0x95, 0x55, 0xf3, 0xab, 0x02, 0x5b, 0xef, 0x58,
0x47, 0x02, 0xf5, 0x97, 0xdc, 0x97, 0xc8, 0xf0, 0x24, 0x41, 0x21, 0xe9, 0x0e, 0x80, 0xf4, 0x03, 0x28, 0x90, 0xe2, 0x65, 0x86, 0x5c, 0x90, 0x63, 0x00, 0x11, 0x46, 0xc8, 0x91, 0x85, 0xc8, 0x0d,
0x14, 0xc8, 0x7d, 0x14, 0x06, 0xd9, 0xd0, 0x9a, 0xfa, 0xd6, 0xd5, 0xd6, 0xcf, 0xd0, 0x56, 0xd7, 0xa5, 0xab, 0x5a, 0x5a, 0xff, 0x6e, 0xef, 0xc6, 0xb4, 0x37, 0x0e, 0x23, 0x74, 0x0a, 0xf4, 0xa4,
0x0f, 0xd0, 0xce, 0xba, 0x7b, 0x95, 0xb3, 0x6f, 0xeb, 0x33, 0x4c, 0xd1, 0xd3, 0x1d, 0xa8, 0x05, 0x71, 0xfd, 0xf3, 0x60, 0x8d, 0x4a, 0x7c, 0x72, 0x0c, 0xad, 0x08, 0x85, 0x17, 0x78, 0xc2, 0x33,
0x28, 0x5d, 0xcf, 0x95, 0xae, 0xa1, 0x65, 0xde, 0x55, 0xd5, 0x6b, 0xa1, 0xe4, 0x7e, 0xdf, 0x2a, 0xd4, 0x42, 0xdb, 0x96, 0xb5, 0x36, 0x0a, 0x16, 0xfa, 0x76, 0xc5, 0xa8, 0xf4, 0x4b, 0xc5, 0x59,
0x14, 0x85, 0x7f, 0xe4, 0x38, 0xac, 0xd4, 0x66, 0x1b, 0x9a, 0xf9, 0x9e, 0xc0, 0xb2, 0xe5, 0x87, 0xa3, 0xb5, 0xae, 0xab, 0xe6, 0x17, 0x05, 0x0c, 0x3b, 0x8c, 0xc3, 0x28, 0xfc, 0x8c, 0x81, 0x9c,
0x7e, 0xe0, 0xbf, 0x43, 0x6f, 0x8c, 0xad, 0x3d, 0x81, 0x6d, 0x7d, 0x2c, 0xbf, 0xb4, 0xfd, 0x11, 0xcd, 0x11, 0x8c, 0x8c, 0x6e, 0x89, 0x67, 0xae, 0xb4, 0xa8, 0x95, 0x37, 0x39, 0x1d, 0xc1, 0x6e,
0xd2, 0x80, 0xaa, 0x18, 0x06, 0xbd, 0xe8, 0x58, 0x18, 0x95, 0x0d, 0xd2, 0xfc, 0x8f, 0x95, 0xaf, 0x89, 0x6a, 0x40, 0x93, 0xcf, 0xa3, 0x49, 0x32, 0xe3, 0x46, 0xa3, 0xab, 0x5a, 0xff, 0xd1, 0xfa,
0x39, 0xc0, 0x61, 0xa5, 0xa6, 0x35, 0x2a, 0xe6, 0x07, 0x02, 0xc6, 0x44, 0x8c, 0x0e, 0x86, 0xf4, 0x58, 0xc6, 0x38, 0x6b, 0xb4, 0x54, 0xbd, 0x61, 0x7e, 0x57, 0x40, 0xa3, 0xe8, 0x05, 0xf5, 0x78,
0x60, 0x02, 0x89, 0x39, 0x85, 0xa4, 0x83, 0xe1, 0x74, 0x98, 0xfa, 0x3f, 0xc2, 0xd8, 0x92, 0x5f, 0x0e, 0xa1, 0x79, 0x99, 0xc9, 0xcd, 0xef, 0xc8, 0xcd, 0xdf, 0x64, 0xc8, 0xe6, 0xb4, 0x66, 0x90,
0x0a, 0xc6, 0x96, 0x7c, 0x1a, 0x8c, 0x76, 0xd1, 0x64, 0x3e, 0x11, 0xb8, 0x7e, 0x11, 0xcc, 0x43, 0x8f, 0xb0, 0xef, 0xf9, 0x3e, 0xa6, 0x02, 0x03, 0x97, 0x21, 0x4f, 0x93, 0x98, 0xa3, 0x5b, 0xbc,
0xff, 0x14, 0x3d, 0x6a, 0x4d, 0x20, 0xba, 0x39, 0x9d, 0x28, 0x33, 0x5f, 0x1a, 0xeb, 0x0b, 0x01, 0x89, 0xb1, 0xde, 0x55, 0xad, 0xed, 0xfe, 0x7d, 0x59, 0x2c, 0xb5, 0xe9, 0xd1, 0x8a, 0x3d, 0x9e,
0x9d, 0xa1, 0xeb, 0x95, 0xdb, 0xb2, 0x09, 0xd5, 0x93, 0x44, 0x25, 0xb8, 0xa2, 0x12, 0x3c, 0x4f, 0xa7, 0x48, 0xf7, 0x6a, 0x13, 0xb9, 0xca, 0xcd, 0x23, 0xd8, 0x92, 0x0b, 0x44, 0x83, 0xa6, 0x33,
0x90, 0x0f, 0x59, 0xa9, 0xa0, 0x6f, 0x60, 0xc5, 0xed, 0xf7, 0x31, 0x96, 0xe8, 0x39, 0x1c, 0x45, 0xb0, 0x5f, 0x9f, 0x0f, 0x1d, 0x7d, 0x8d, 0xec, 0xc3, 0x8e, 0x33, 0xa6, 0xc3, 0x81, 0x3d, 0x7c,
0x1c, 0x85, 0x02, 0x9d, 0xec, 0xf3, 0x31, 0x66, 0x37, 0xb4, 0xe6, 0xc2, 0xd6, 0x0d, 0xd5, 0xac, 0xe6, 0xbe, 0x7f, 0x45, 0xdd, 0xd3, 0xd1, 0xdb, 0x97, 0x2f, 0x1c, 0x5d, 0x31, 0x07, 0xb9, 0xca,
0xfc, 0x4c, 0x8b, 0x15, 0xea, 0xee, 0x30, 0x46, 0xb6, 0x5c, 0x86, 0xa8, 0x55, 0x61, 0x6e, 0x43, 0x5b, 0x5a, 0x91, 0x47, 0xd0, 0x64, 0xc8, 0xb3, 0x99, 0xa8, 0x2f, 0xb4, 0xff, 0xf7, 0x85, 0x0a,
0x5d, 0x2d, 0x50, 0x1d, 0xaa, 0xf6, 0xae, 0xf5, 0xac, 0xd3, 0xb6, 0x1b, 0x33, 0x74, 0x05, 0x16, 0x9c, 0xd6, 0x3c, 0xf3, 0x9b, 0x02, 0x1b, 0x05, 0x40, 0x1e, 0x00, 0xe1, 0xc2, 0x63, 0xc2, 0x2d,
0xed, 0x2e, 0x6b, 0xef, 0x5a, 0xed, 0x07, 0xce, 0xab, 0xa7, 0xcc, 0xd9, 0x3f, 0x78, 0xf1, 0xe4, 0xe6, 0x2a, 0xbc, 0x28, 0x75, 0xa3, 0xdc, 0x47, 0xb1, 0x54, 0xaa, 0x17, 0xc8, 0xb8, 0x06, 0x6c,
0xb1, 0xdd, 0x20, 0xe6, 0x6e, 0xea, 0x72, 0x47, 0x51, 0xf4, 0x0e, 0x54, 0x39, 0x8a, 0xe4, 0x58, 0x4e, 0x2c, 0xd0, 0x31, 0x0e, 0x56, 0xb9, 0xeb, 0x05, 0x77, 0x1b, 0xe3, 0x40, 0x66, 0x1e, 0x41,
0x96, 0x07, 0x5a, 0xf9, 0xfd, 0x40, 0x59, 0x9f, 0x95, 0x3a, 0xf3, 0x33, 0x81, 0xb9, 0xac, 0x41, 0x2b, 0xf2, 0x84, 0x3f, 0x45, 0xc6, 0xab, 0x35, 0x32, 0xe4, 0x54, 0xe7, 0xde, 0x04, 0x67, 0x76,
0x6f, 0x01, 0x15, 0xd2, 0xe5, 0xd2, 0xc9, 0xe6, 0x2a, 0xdd, 0x20, 0x76, 0x82, 0x34, 0x87, 0x34, 0x49, 0xa0, 0x4b, 0x26, 0x39, 0x84, 0x8d, 0x69, 0x18, 0x8b, 0xfc, 0x3d, 0x15, 0x4b, 0xeb, 0xef,
0x35, 0xd6, 0xc8, 0x3a, 0xdd, 0xb2, 0x61, 0x09, 0xda, 0x84, 0x06, 0x86, 0xde, 0xb8, 0x76, 0x36, 0xfd, 0x39, 0xdc, 0x51, 0x0e, 0xd2, 0x92, 0x63, 0x0e, 0x41, 0x93, 0x2e, 0x47, 0x9e, 0xfc, 0xfb,
0xd3, 0x2e, 0x60, 0xe8, 0xa9, 0xca, 0x6d, 0xa8, 0x05, 0xae, 0xec, 0x0f, 0x90, 0x8b, 0xe2, 0x8b, 0xda, 0xcb, 0x5b, 0x64, 0x5e, 0xc1, 0xce, 0xe9, 0x34, 0x8b, 0x3f, 0xe5, 0x8f, 0x23, 0x4d, 0xf5,
0x37, 0x54, 0xaa, 0x8e, 0xdb, 0xc3, 0x63, 0x2b, 0x17, 0xb0, 0x91, 0x92, 0x6e, 0xc2, 0xdc, 0xc0, 0x29, 0x6c, 0xfb, 0x65, 0xd9, 0x5d, 0xb1, 0xbc, 0x27, 0x5b, 0x56, 0xc2, 0xca, 0xf5, 0x7f, 0x5f,
0x0f, 0x65, 0xbe, 0xf3, 0xfa, 0xd6, 0xf2, 0xaf, 0xc3, 0x3d, 0x48, 0x9b, 0x2c, 0xd7, 0x98, 0x6d, 0x3e, 0x92, 0x03, 0xd0, 0xf2, 0x35, 0x9a, 0xbb, 0x61, 0x1c, 0xe0, 0x55, 0x35, 0x27, 0x28, 0x4a,
0xd0, 0x95, 0xc3, 0xd1, 0x7b, 0x7f, 0x7f, 0x43, 0xa9, 0x5b, 0x64, 0x9e, 0xc2, 0xe2, 0xfe, 0x20, 0xcf, 0xf3, 0xca, 0xc9, 0xee, 0xf5, 0xa2, 0xa3, 0xfc, 0x58, 0x74, 0x94, 0x5f, 0x8b, 0x8e, 0xf2,
0x09, 0xdf, 0xa6, 0x7f, 0x8e, 0x32, 0xd5, 0xfb, 0xb0, 0xd0, 0xcf, 0xcb, 0xce, 0x58, 0xe4, 0x35, 0x61, 0x33, 0xf7, 0x4d, 0x27, 0x93, 0xcd, 0xe2, 0xb7, 0x7e, 0xfc, 0x3b, 0x00, 0x00, 0xff, 0xff,
0x35, 0xb2, 0x30, 0x16, 0xa9, 0xff, 0xf7, 0xd5, 0x57, 0xba, 0x0e, 0x7a, 0xba, 0x46, 0x43, 0xc7, 0x8b, 0x1e, 0x3d, 0x82, 0x15, 0x04, 0x00, 0x00,
0x0f, 0x3d, 0x3c, 0x2d, 0xe6, 0x04, 0x59, 0xe9, 0x51, 0x5a, 0xd9, 0x5b, 0x3a, 0x3b, 0x5f, 0x23,
0x5f, 0xcf, 0xd7, 0xc8, 0xf7, 0xf3, 0x35, 0xf2, 0x7a, 0x3e, 0xcd, 0x8d, 0x7b, 0xbd, 0xf9, 0xec,
0x06, 0xbe, 0xfb, 0x23, 0x00, 0x00, 0xff, 0xff, 0xf6, 0xd2, 0x59, 0xc2, 0xc0, 0x05, 0x00, 0x00,
} }
func (m *WriteRequest) Marshal() (dAtA []byte, err error) { func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
@ -744,102 +569,6 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *MinimizedWriteRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestLen) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestLen) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestLen) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
i -= len(m.Symbols)
copy(dAtA[i:], m.Symbols)
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols)))
i--
dAtA[i] = 0x22
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *MinimizedWriteRequestStr) Marshal() (dAtA []byte, err error) { func (m *MinimizedWriteRequestStr) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -890,56 +619,6 @@ func (m *MinimizedWriteRequestStr) MarshalToSizedBuffer(dAtA []byte) (int, error
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *MinimizedWriteRequestStrFixed) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestStrFixed) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestStrFixed) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x22
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) { func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -1225,50 +904,6 @@ func (m *WriteRequest) Size() (n int) {
return n return n
} }
func (m *MinimizedWriteRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
l = len(m.Symbols)
if l > 0 {
n += 1 + l + sovRemote(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *MinimizedWriteRequestLen) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
l = len(m.Symbols)
if l > 0 {
n += 1 + l + sovRemote(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *MinimizedWriteRequestStr) Size() (n int) { func (m *MinimizedWriteRequestStr) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -1293,30 +928,6 @@ func (m *MinimizedWriteRequestStr) Size() (n int) {
return n return n
} }
func (m *MinimizedWriteRequestStrFixed) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if len(m.Symbols) > 0 {
for _, s := range m.Symbols {
l = len(s)
n += 1 + l + sovRemote(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *ReadRequest) Size() (n int) { func (m *ReadRequest) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -1552,242 +1163,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *MinimizedWriteRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeries{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequestLen) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestLen: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestLen: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesLen{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols[:0], dAtA[iNdEx:postIndex]...)
if m.Symbols == nil {
m.Symbols = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MinimizedWriteRequestStr) Unmarshal(dAtA []byte) error { func (m *MinimizedWriteRequestStr) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0
@ -1905,123 +1280,6 @@ func (m *MinimizedWriteRequestStr) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *MinimizedWriteRequestStrFixed) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestStrFixed: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestStrFixed: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesStrFixed{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadRequest) Unmarshal(dAtA []byte) error { func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0

View file

@ -27,30 +27,6 @@ message WriteRequest {
repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false]; repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
} }
message MinimizedWriteRequest {
repeated MinimizedTimeSeries timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
// Metadata (3) has moved to be part of the TimeSeries type
reserved 3;
// The symbols table. All symbols are concatenated strings. To read the symbols table, it's required
// to know the offset:length range of the actual symbol to read from this string.
string symbols = 4;
}
message MinimizedWriteRequestLen {
repeated MinimizedTimeSeriesLen timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
// Metadata (3) has moved to be part of the TimeSeries type
reserved 3;
// The symbols table. All symbols are concatenated strings prepended with a varint of their length.
// To read the symbols table, it's required to know the offset of the actual symbol to read from this string.
bytes symbols = 4;
}
message MinimizedWriteRequestStr { message MinimizedWriteRequestStr {
repeated MinimizedTimeSeriesStr timeseries = 1 [(gogoproto.nullable) = false]; repeated MinimizedTimeSeriesStr timeseries = 1 [(gogoproto.nullable) = false];
reserved 2; reserved 2;
@ -58,13 +34,6 @@ message MinimizedWriteRequestStr {
repeated string symbols = 4; repeated string symbols = 4;
} }
message MinimizedWriteRequestStrFixed {
repeated MinimizedTimeSeriesStrFixed timeseries = 1 [(gogoproto.nullable) = false];
reserved 2;
reserved 3;
repeated string symbols = 4;
}
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.
message ReadRequest { message ReadRequest {
repeated Query queries = 1; repeated Query queries = 1;

File diff suppressed because it is too large Load diff

View file

@ -130,52 +130,11 @@ message TimeSeries {
repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
} }
// based on an experiment by marco
message MinimizedTimeSeries {
// Sorted list of label name-value pair references. This list's len is always multiple of 4,
// packing tuples of (label name offset, label name length, label value offset, label value length).
// Offsets point to the symbol table in the higher level MinimizedWriteRequestLen.
repeated uint32 label_symbols = 1 [(gogoproto.nullable) = false];
// Sorted by time, oldest sample first.
// TODO: support references for other types
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message MinimizedTimeSeriesLen {
// Sorted list of label name-value pair references, encoded as 32bit uint. This
// list's real len is always multiple of 2, label name offset/label value offset.
// Offsets point to the symbol table in the higher level MinimizedWriteRequestLen.
repeated fixed32 label_symbols = 1;
// Sorted by time, oldest sample first.
// TODO: support references for other types
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message MinimizedTimeSeriesStr { message MinimizedTimeSeriesStr {
// Sorted list of label name-value pair references, encoded as indices to a strings array. // Sorted list of label name-value pair references, encoded as indices to a strings array.
// This list's len is always multiple of 2. // This list's len is always multiple of 2.
repeated uint32 label_symbols = 1; repeated uint32 label_symbols = 1 [(gogoproto.nullable) = false];
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message MinimizedTimeSeriesStrFixed {
// Sorted list of label name-value pair references, encoded as indices to a strings array.
// This list's len is always multiple of 2.
repeated fixed32 label_symbols = 1;
// Sorted by time, oldest sample first. // Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false];

View file

@ -7,10 +7,7 @@ trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
declare -a INSTANCES declare -a INSTANCES
# (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags) # (sender,receiver) pairs to run: (sender_name; sender_flags; receiver_name; receiver_flags)
INSTANCES+=('sender-v1;;receiver-v1;') INSTANCES+=('sender-v1;;receiver-v1;')
INSTANCES+=('sender-v11-min32-optimized-varint;--remote-write-format 1;receiver-v11-min32-optimized-varint;--remote-write-format 1') INSTANCES+=('sender-v11;--remote-write-format 1;receiver-v11;--remote-write-format 1')
INSTANCES+=('sender-v11-min-len;--remote-write-format 2;receiver-v11-min-len;--remote-write-format 2')
INSTANCES+=('sender-v11-min-str;--remote-write-format 3;receiver-v11-min-str;--remote-write-format 3')
INSTANCES+=('sender-v11-min-str-fixed;--remote-write-format 4;receiver-v11-min-str-fixed;--remote-write-format 4')
# ~~~~~~~~~~~~~ # ~~~~~~~~~~~~~

View file

@ -15,7 +15,6 @@ package remote
import ( import (
"compress/gzip" "compress/gzip"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -24,7 +23,6 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"unsafe"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/golang/snappy" "github.com/golang/snappy"
@ -785,28 +783,6 @@ func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label
return result return result
} }
func labelsToUint32Slice(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0]
lbls.Range(func(l labels.Label) {
off, leng := symbolTable.Ref(l.Name)
result = append(result, off, leng)
off, leng = symbolTable.Ref(l.Value)
result = append(result, off, leng)
})
return result
}
func labelsToUint32SliceLen(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0]
lbls.Range(func(l labels.Label) {
off := symbolTable.RefLen(l.Name)
result = append(result, off)
off = symbolTable.RefLen(l.Value)
result = append(result, off)
})
return result
}
func labelsToUint32SliceStr(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 { func labelsToUint32SliceStr(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0] result := buf[:0]
lbls.Range(func(l labels.Label) { lbls.Range(func(l labels.Label) {
@ -818,29 +794,6 @@ func labelsToUint32SliceStr(lbls labels.Labels, symbolTable *rwSymbolTable, buf
return result return result
} }
func Uint32LenRefToLabels(symbols []byte, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
labelIdx := 0
for labelIdx < len(minLabels) {
// todo, check for overflow?
offset := minLabels[labelIdx]
labelIdx++
length, n := binary.Uvarint(symbols[offset:])
offset += uint32(n)
name := symbols[offset : uint64(offset)+length]
offset = minLabels[labelIdx]
labelIdx++
length, n = binary.Uvarint(symbols[offset:])
offset += uint32(n)
value := symbols[offset : uint64(offset)+length]
ls.Add(yoloString(name), yoloString(value))
}
return ls.Labels()
}
func Uint32StrRefToLabels(symbols []string, minLabels []uint32) labels.Labels { func Uint32StrRefToLabels(symbols []string, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2) ls := labels.NewScratchBuilder(len(minLabels) / 2)
@ -858,33 +811,6 @@ func Uint32StrRefToLabels(symbols []string, minLabels []uint32) labels.Labels {
return ls.Labels() return ls.Labels()
} }
func yoloString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func Uint32RefToLabels(symbols string, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
labelIdx := 0
for labelIdx < len(minLabels) {
// todo, check for overflow?
offset := minLabels[labelIdx]
labelIdx++
length := minLabels[labelIdx]
labelIdx++
name := symbols[offset : offset+length]
// todo, check for overflow?
offset = minLabels[labelIdx]
labelIdx++
length = minLabels[labelIdx]
labelIdx++
value := symbols[offset : offset+length]
ls.Add(name, value)
}
return ls.Labels()
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. // metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType { func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType {
mt := strings.ToUpper(string(t)) mt := strings.ToUpper(string(t))
@ -970,46 +896,6 @@ func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
return otlpReq, nil return otlpReq, nil
} }
// DecodeMinimizedWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestLen
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestStr, error) { func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestStr, error) {
compressed, err := io.ReadAll(r) compressed, err := io.ReadAll(r)
if err != nil { if err != nil {
@ -1029,33 +915,14 @@ func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestS
return &req, nil return &req, nil
} }
func DecodeMinimizedWriteRequestStrFixed(r io.Reader) (*prompb.MinimizedWriteRequestStrFixed, error) { func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequestStr) (*prompb.WriteRequest, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.MinimizedWriteRequestStrFixed
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)), Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
// TODO handle metadata? // TODO handle metadata?
} }
for i, rts := range redReq.Timeseries { for i, rts := range redReq.Timeseries {
Uint32RefToLabels(redReq.Symbols, rts.LabelSymbols).Range(func(l labels.Label) { Uint32StrRefToLabels(redReq.Symbols, rts.LabelSymbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{ req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name, Name: l.Name,
Value: l.Value, Value: l.Value,

View file

@ -75,7 +75,7 @@ var writeRequestFixture = &prompb.WriteRequest{
} }
// writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation. // writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation.
var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest { var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequestStr {
st := newRwSymbolTable() st := newRwSymbolTable()
var labels []uint32 var labels []uint32
for _, s := range []string{ for _, s := range []string{
@ -85,11 +85,11 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest {
"d", "e", "d", "e",
"foo", "bar", "foo", "bar",
} { } {
off, length := st.Ref(s) ref := st.RefStr(s)
labels = append(labels, off, length) labels = append(labels, ref)
} }
return &prompb.MinimizedWriteRequest{ return &prompb.MinimizedWriteRequestStr{
Timeseries: []prompb.MinimizedTimeSeries{ Timeseries: []prompb.MinimizedTimeSeriesStr{
{ {
LabelSymbols: labels, LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
@ -103,7 +103,7 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest {
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())}, Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
}, },
}, },
Symbols: st.LabelsString(), Symbols: st.LabelsStrings(),
} }
}() }()
@ -559,11 +559,11 @@ func TestDecodeWriteRequest(t *testing.T) {
} }
func TestDecodeMinWriteRequest(t *testing.T) { func TestDecodeMinWriteRequest(t *testing.T) {
buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) buf, _, err := buildMinimizedWriteRequestStr(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
require.NoError(t, err) require.NoError(t, err)
actual, err := DecodeMinimizedWriteRequest(bytes.NewReader(buf)) actual, err := DecodeMinimizedWriteRequestStr(bytes.NewReader(buf))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, writeRequestMinimizedFixture, actual) require.Equal(t, writeRequestMinimizedFixture, actual)
} }
@ -887,10 +887,10 @@ func (c *mockChunkIterator) Err() error {
return nil return nil
} }
func TestLenFormat(t *testing.T) { func TestStrFormat(t *testing.T) {
r := newRwSymbolTable() r := newRwSymbolTable()
ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234") ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234")
encoded := labelsToUint32SliceLen(ls, &r, nil) encoded := labelsToUint32SliceStr(ls, &r, nil)
decoded := Uint32LenRefToLabels(r.LabelsData(), encoded) decoded := Uint32StrRefToLabels(r.LabelsStrings(), encoded)
require.Equal(t, ls, decoded) require.Equal(t, ls, decoded)
} }

View file

@ -393,10 +393,7 @@ type RemoteWriteFormat int64
const ( const (
Base1 RemoteWriteFormat = iota // original map based format Base1 RemoteWriteFormat = iota // original map based format
Min32Optimized // two 32bit varint plus marshalling optimization
MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice)
MinStrings // symbols are indices into an array of strings MinStrings // symbols are indices into an array of strings
MinStringsFix // symbols are indices into an array of strings. Indices are fixed length
) )
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
@ -1384,26 +1381,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
pendingMinimizedData := make([]prompb.MinimizedTimeSeries, max)
for i := range pendingMinimizedData {
pendingMinimizedData[i].Samples = []prompb.Sample{{}}
}
pendingMinLenData := make([]prompb.MinimizedTimeSeriesLen, max)
for i := range pendingMinLenData {
pendingMinLenData[i].Samples = []prompb.Sample{{}}
}
pendingMinStrData := make([]prompb.MinimizedTimeSeriesStr, max) pendingMinStrData := make([]prompb.MinimizedTimeSeriesStr, max)
for i := range pendingMinStrData { for i := range pendingMinStrData {
pendingMinStrData[i].Samples = []prompb.Sample{{}} pendingMinStrData[i].Samples = []prompb.Sample{{}}
} }
pendingMinStrFixedData := make([]prompb.MinimizedTimeSeriesStrFixed, max)
for i := range pendingMinStrFixedData {
pendingMinStrFixedData[i].Samples = []prompb.Sample{{}}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() { stop := func() {
if !timer.Stop() { if !timer.Stop() {
@ -1443,26 +1425,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
case Min32Optimized:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear()
case MinLen:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
case MinStrings: case MinStrings:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
case MinStringsFix:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStrFixed(&symbolTable, batch, pendingMinStrFixedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrFixedSamples(ctx, pendingMinStrFixedData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
@ -1480,30 +1447,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
case Min32Optimized:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear()
case MinLen:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
case MinStrings: case MinStrings:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
case MinStringsFix:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStrFixed(&symbolTable, batch, pendingMinStrFixedData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrFixedSamples(ctx, pendingMinStrFixedData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
symbolTable.clear()
} }
} }
queue.ReturnForReuse(batch) queue.ReturnForReuse(batch)
@ -1564,30 +1512,6 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) sendMinStrSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesStr, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) { func (s *shards) sendMinStrSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesStr, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf *[]byte, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the ReducedWriteRequest with no metadata. // Build the ReducedWriteRequest with no metadata.
@ -1600,18 +1524,6 @@ func (s *shards) sendMinStrSamples(ctx context.Context, samples []prompb.Minimiz
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinStrFixedSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesStrFixed, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestStrFixed(samples, labels, pBuf, buf)
if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest)
}
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
}
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) { func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) {
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
@ -1694,84 +1606,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp
return err return err
} }
func populateMinimizedTimeSeries(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32Slice(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func populateMinimizedTimeSeriesLen(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesLen, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceLen(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
// TODO: handle all exemplars
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesStr, sendExemplars, sendNativeHistograms bool) (int, int, int) { func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesStr, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch { for nPending, d := range batch {
@ -1814,48 +1648,6 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func populateMinimizedTimeSeriesStrFixed(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesStrFixed, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols)
switch d.sType {
case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++
case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, HistogramToHistogramProto(d.timestamp, d.histogram))
nPendingHistograms++
case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff backoff := cfg.MinBackoff
sleepDuration := model.Duration(0) sleepDuration := model.Duration(0)
@ -2033,97 +1825,6 @@ func (r *rwSymbolTable) clear() {
r.symbols = r.symbols[:0] r.symbols = r.symbols[:0]
} }
func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequest{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = &[]byte{} // For convenience in tests. Not efficient.
}
data, err := req.OptimizedMarshal(*pBuf)
if err != nil {
return nil, 0, err
}
*pBuf = data
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, data)
if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequestLen{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}
func buildMinimizedWriteRequestStr(samples []prompb.MinimizedTimeSeriesStr, labels []string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) { func buildMinimizedWriteRequestStr(samples []prompb.MinimizedTimeSeriesStr, labels []string, pBuf *[]byte, buf *[]byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
@ -2169,49 +1870,3 @@ func buildMinimizedWriteRequestStr(samples []prompb.MinimizedTimeSeriesStr, labe
} }
return compressed, highest, nil return compressed, highest, nil
} }
func buildMinimizedWriteRequestStrFixed(samples []prompb.MinimizedTimeSeriesStrFixed, labels []string, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
}
req := &prompb.MinimizedWriteRequestStrFixed{
Symbols: labels,
Timeseries: samples,
}
if pBuf == nil {
pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient.
} else {
pBuf.Reset()
}
err := pBuf.Marshal(req)
if err != nil {
return nil, 0, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
*buf = (*buf)[0:cap(*buf)]
} else {
buf = &[]byte{}
}
compressed := snappy.Encode(*buf, pBuf.Bytes())
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
// grow the buffer for the next time
*buf = make([]byte, n)
}
return compressed, highest, nil
}

View file

@ -75,11 +75,11 @@ func TestSampleDelivery(t *testing.T) {
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, {samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, {samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
{rwFormat: Min32Optimized, samples: true, exemplars: false, histograms: false, name: "interned samples only"}, {rwFormat: MinStrings, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
{rwFormat: Min32Optimized, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"}, {rwFormat: MinStrings, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
{rwFormat: Min32Optimized, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"}, {rwFormat: MinStrings, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
{rwFormat: Min32Optimized, samples: false, exemplars: false, histograms: true, name: "interned histograms only"}, {rwFormat: MinStrings, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
{rwFormat: Min32Optimized, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"}, {rwFormat: MinStrings, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
} }
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
@ -204,7 +204,7 @@ func TestMetadataDelivery(t *testing.T) {
} }
func TestSampleDeliveryTimeout(t *testing.T) { func TestSampleDeliveryTimeout(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration // Let's send one less sample than batch size, and wait the timeout duration
n := 9 n := 9
@ -237,7 +237,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
} }
func TestSampleDeliveryOrder(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
ts := 10 ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
@ -339,7 +339,7 @@ func TestSeriesReset(t *testing.T) {
} }
func TestReshard(t *testing.T) { func TestReshard(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
size := 10 // Make bigger to find more races. size := 10 // Make bigger to find more races.
nSeries := 6 nSeries := 6
@ -382,7 +382,7 @@ func TestReshard(t *testing.T) {
} }
func TestReshardRaceWithStop(t *testing.T) { func TestReshardRaceWithStop(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
c := NewTestWriteClient(rwFormat) c := NewTestWriteClient(rwFormat)
var m *QueueManager var m *QueueManager
@ -421,7 +421,7 @@ func TestReshardRaceWithStop(t *testing.T) {
} }
func TestReshardPartialBatch(t *testing.T) { func TestReshardPartialBatch(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
samples, series := createTimeseries(1, 10) samples, series := createTimeseries(1, 10)
@ -467,7 +467,7 @@ func TestReshardPartialBatch(t *testing.T) {
// where a large scrape (> capacity + max samples per send) is appended at the // where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline. // same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) { func TestQueueFilledDeadlock(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
samples, series := createTimeseries(50, 1) samples, series := createTimeseries(50, 1)
@ -509,7 +509,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
} }
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -830,8 +830,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
case Base1: case Base1:
reqProto = &prompb.WriteRequest{} reqProto = &prompb.WriteRequest{}
err = proto.Unmarshal(reqBuf, reqProto) err = proto.Unmarshal(reqBuf, reqProto)
case Min32Optimized: case MinStrings:
var reqMin prompb.MinimizedWriteRequest var reqMin prompb.MinimizedWriteRequestStr
err = proto.Unmarshal(reqBuf, &reqMin) err = proto.Unmarshal(reqBuf, &reqMin)
if err == nil { if err == nil {
reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin) reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin)
@ -1503,7 +1503,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
for _, tc := range testCases { for _, tc := range testCases {
symbolTable := newRwSymbolTable() symbolTable := newRwSymbolTable()
buff := make([]byte, 0) buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) seriesBuff := make([]prompb.MinimizedTimeSeriesStr, len(tc.batch))
for i := range seriesBuff { for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}} seriesBuff[i].Samples = []prompb.Sample{{}}
seriesBuff[i].Exemplars = []prompb.Exemplar{{}} seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
@ -1512,16 +1512,16 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
// Warmup buffers // Warmup buffers
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff)
} }
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0 totalSize := 0
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer() b.ResetTimer()
req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) req, _, err := buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View file

@ -73,23 +73,14 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error var err error
var req *prompb.WriteRequest var req *prompb.WriteRequest
var reqMin *prompb.MinimizedWriteRequest
var reqMinLen *prompb.MinimizedWriteRequestLen
var reqMinStr *prompb.MinimizedWriteRequestStr var reqMinStr *prompb.MinimizedWriteRequestStr
var reqMinFixed *prompb.MinimizedWriteRequestStrFixed
// TODO: this should eventually be done via content negotiation/looking at the header // TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat { switch h.rwFormat {
case Base1: case Base1:
req, err = DecodeWriteRequest(r.Body) req, err = DecodeWriteRequest(r.Body)
case Min32Optimized:
reqMin, err = DecodeMinimizedWriteRequest(r.Body)
case MinLen:
reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body)
case MinStrings: case MinStrings:
reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body) reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body)
case MinStringsFix:
reqMinFixed, err = DecodeMinimizedWriteRequestStrFixed(r.Body)
} }
if err != nil { if err != nil {
@ -102,14 +93,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch h.rwFormat { switch h.rwFormat {
case Base1: case Base1:
err = h.write(r.Context(), req) err = h.write(r.Context(), req)
case Min32Optimized:
err = h.writeMin(r.Context(), reqMin)
case MinLen:
err = h.writeMinLen(r.Context(), reqMinLen)
case MinStrings: case MinStrings:
err = h.writeMinStr(r.Context(), reqMinStr) err = h.writeMinStr(r.Context(), reqMinStr)
case MinStringsFix:
err = h.writeMinStrFixed(r.Context(), reqMinFixed)
} }
switch { switch {
@ -305,82 +290,6 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
func (h *writeHandler) writeMin(ctx context.Context, req *prompb.MinimizedWriteRequest) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32RefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}
func (h *writeHandler) writeMinLen(ctx context.Context, req *prompb.MinimizedWriteRequestLen) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32LenRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}
func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWriteRequestStr) (err error) { func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWriteRequestStr) (err error) {
outOfOrderExemplarErrs := 0 outOfOrderExemplarErrs := 0
@ -418,41 +327,3 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWri
return nil return nil
} }
func (h *writeHandler) writeMinStrFixed(ctx context.Context, req *prompb.MinimizedWriteRequestStrFixed) (err error) {
outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx)
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
for _, ts := range req.Timeseries {
ls := Uint32StrRefToLabels(req.Symbols, ts.LabelSymbols)
err := h.appendSamples(app, ts.Samples, ls)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, ls)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
return nil
}

View file

@ -85,7 +85,7 @@ func TestRemoteWriteHandler(t *testing.T) {
} }
func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) buf, _, err := buildMinimizedWriteRequestStr(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf)) req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -94,7 +94,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(nil, nil, appendable, Min32Optimized) handler := NewWriteHandler(nil, nil, appendable, MinStrings)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)