rename new proto types and move to separate pkg

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-12-21 11:40:36 -03:00
parent 38c444b942
commit fe41ed9067
16 changed files with 3631 additions and 3057 deletions

View file

@ -65,14 +65,14 @@ func main() {
} }
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
m := make(model.Metric, len(ts.LabelSymbols)/2) m := make(model.Metric, len(ts.LabelsRefs)/2)
labelIdx := 0 labelIdx := 0
for labelIdx < len(ts.LabelSymbols) { for labelIdx < len(ts.LabelsRefs) {
// todo, check for overflow? // todo, check for overflow?
nameIdx := ts.LabelSymbols[labelIdx] nameIdx := ts.LabelsRefs[labelIdx]
labelIdx++ labelIdx++
valueIdx := ts.LabelSymbols[labelIdx] valueIdx := ts.LabelsRefs[labelIdx]
labelIdx++ labelIdx++
name := req.Symbols[nameIdx] name := req.Symbols[nameIdx]
value := req.Symbols[valueIdx] value := req.Symbols[valueIdx]

View file

@ -15,8 +15,6 @@ package prompb
import ( import (
"sync" "sync"
"golang.org/x/exp/slices"
) )
func (m Sample) T() int64 { return m.Timestamp } func (m Sample) T() int64 { return m.Timestamp }
@ -27,11 +25,6 @@ func (h Histogram) IsFloatHistogram() bool {
return ok return ok
} }
func (h MinHistogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*MinHistogram_CountFloat)
return ok
}
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) { func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size() size := r.Size()
data, ok := p.Get().(*[]byte) data, ok := p.Get().(*[]byte)
@ -44,134 +37,3 @@ func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
} }
return r.Marshal() return r.Marshal()
} }
func (m *MinimizedWriteRequestStr) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
dst = make([]byte, siz)
}
dst = dst[:siz]
n, err := m.OptimizedMarshalToSizedBuffer(dst)
if err != nil {
return nil, err
}
return (dst)[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *MinimizedWriteRequestStr) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x1a
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelSymbols in place without extra allocations.
func (m *MinimizedTimeSeriesStr) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelSymbols) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelSymbols {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}

View file

@ -60,7 +60,7 @@ func (x ReadRequest_ResponseType) String() string {
} }
func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{2, 0} return fileDescriptor_eefc82927d57d89b, []int{1, 0}
} }
type WriteRequest struct { type WriteRequest struct {
@ -118,61 +118,6 @@ func (m *WriteRequest) GetMetadata() []MetricMetadata {
return nil return nil
} }
type MinimizedWriteRequestStr struct {
Timeseries []MinimizedTimeSeriesStr `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Symbols []string `protobuf:"bytes,3,rep,name=symbols,proto3" json:"symbols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MinimizedWriteRequestStr) Reset() { *m = MinimizedWriteRequestStr{} }
func (m *MinimizedWriteRequestStr) String() string { return proto.CompactTextString(m) }
func (*MinimizedWriteRequestStr) ProtoMessage() {}
func (*MinimizedWriteRequestStr) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{1}
}
func (m *MinimizedWriteRequestStr) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MinimizedWriteRequestStr) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MinimizedWriteRequestStr.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MinimizedWriteRequestStr) XXX_Merge(src proto.Message) {
xxx_messageInfo_MinimizedWriteRequestStr.Merge(m, src)
}
func (m *MinimizedWriteRequestStr) XXX_Size() int {
return m.Size()
}
func (m *MinimizedWriteRequestStr) XXX_DiscardUnknown() {
xxx_messageInfo_MinimizedWriteRequestStr.DiscardUnknown(m)
}
var xxx_messageInfo_MinimizedWriteRequestStr proto.InternalMessageInfo
func (m *MinimizedWriteRequestStr) GetTimeseries() []MinimizedTimeSeriesStr {
if m != nil {
return m.Timeseries
}
return nil
}
func (m *MinimizedWriteRequestStr) GetSymbols() []string {
if m != nil {
return m.Symbols
}
return nil
}
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.
type ReadRequest struct { type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"`
@ -191,7 +136,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{2} return fileDescriptor_eefc82927d57d89b, []int{1}
} }
func (m *ReadRequest) XXX_Unmarshal(b []byte) error { func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -247,7 +192,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{3} return fileDescriptor_eefc82927d57d89b, []int{2}
} }
func (m *ReadResponse) XXX_Unmarshal(b []byte) error { func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -297,7 +242,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{4} return fileDescriptor_eefc82927d57d89b, []int{3}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -366,7 +311,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) } func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {} func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) { func (*QueryResult) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{5} return fileDescriptor_eefc82927d57d89b, []int{4}
} }
func (m *QueryResult) XXX_Unmarshal(b []byte) error { func (m *QueryResult) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -419,7 +364,7 @@ func (m *ChunkedReadResponse) Reset() { *m = ChunkedReadResponse{} }
func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) } func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) }
func (*ChunkedReadResponse) ProtoMessage() {} func (*ChunkedReadResponse) ProtoMessage() {}
func (*ChunkedReadResponse) Descriptor() ([]byte, []int) { func (*ChunkedReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_eefc82927d57d89b, []int{6} return fileDescriptor_eefc82927d57d89b, []int{5}
} }
func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error { func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -465,7 +410,6 @@ func (m *ChunkedReadResponse) GetQueryIndex() int64 {
func init() { func init() {
proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value) proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value)
proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest")
proto.RegisterType((*MinimizedWriteRequestStr)(nil), "prometheus.MinimizedWriteRequestStr")
proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse")
proto.RegisterType((*Query)(nil), "prometheus.Query") proto.RegisterType((*Query)(nil), "prometheus.Query")
@ -476,41 +420,38 @@ func init() {
func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) } func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) }
var fileDescriptor_eefc82927d57d89b = []byte{ var fileDescriptor_eefc82927d57d89b = []byte{
// 544 bytes of a gzipped FileDescriptorProto // 496 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xae, 0x93, 0xb4, 0x09, 0xe3, 0x10, 0x85, 0x6d, 0x4b, 0x4c, 0x0e, 0x69, 0x64, 0x71, 0x88, 0x10, 0xee, 0x26, 0x69, 0x13, 0x8d, 0x43, 0x14, 0xb6, 0x2d, 0x09, 0x39, 0xa4, 0x91, 0xc5, 0x21,
0x54, 0x14, 0x44, 0xa8, 0x38, 0xf5, 0x40, 0x5a, 0x22, 0x05, 0xa8, 0xf9, 0x59, 0x07, 0x81, 0x10, 0x52, 0x51, 0x10, 0xa1, 0xe2, 0xd4, 0x03, 0x69, 0x89, 0x54, 0xa0, 0xe6, 0x67, 0x13, 0x04, 0x42,
0x92, 0xe5, 0xd8, 0xa3, 0xc6, 0x22, 0xfe, 0xe9, 0xee, 0x5a, 0x6a, 0x38, 0xf3, 0x00, 0x3c, 0x13, 0x48, 0xd6, 0xc6, 0x1e, 0x35, 0x16, 0xf5, 0x4f, 0x77, 0xd7, 0x52, 0xf3, 0x16, 0x3c, 0x13, 0xa7,
0xa7, 0x9e, 0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0x79, 0x63, 0xa7, 0x1b, 0xe8, 0x81, 0x9b, 0x77, 0x9e, 0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0x79, 0x6d, 0x87, 0x2d, 0x5c, 0xb8, 0xad, 0xbf, 0x3f,
0xbe, 0x9f, 0xf9, 0x76, 0x76, 0x0c, 0x75, 0x86, 0x61, 0x2c, 0xb0, 0x9f, 0xb0, 0x58, 0xc4, 0x04, 0xcf, 0xcc, 0xce, 0x42, 0x53, 0x60, 0x18, 0x2b, 0x1c, 0x25, 0x22, 0x56, 0x31, 0x85, 0x44, 0xc4,
0x12, 0x16, 0x87, 0x28, 0x66, 0x98, 0xf2, 0xb6, 0x2e, 0x16, 0x09, 0xf2, 0x15, 0xd0, 0xde, 0x3b, 0x21, 0xaa, 0x25, 0xa6, 0xb2, 0x67, 0xa9, 0x55, 0x82, 0x32, 0x27, 0x7a, 0x7b, 0x17, 0xf1, 0x45,
0x8f, 0xcf, 0x63, 0xf9, 0xf9, 0x30, 0xfb, 0x5a, 0x55, 0xcd, 0x6f, 0x1a, 0xd4, 0xdf, 0xb3, 0x40, 0xac, 0x8f, 0x8f, 0xb2, 0x53, 0x8e, 0xda, 0x5f, 0x09, 0x34, 0x3f, 0x88, 0x40, 0x21, 0xc3, 0xab,
0x20, 0xc5, 0x8b, 0x14, 0xb9, 0x20, 0xc7, 0x00, 0x22, 0x08, 0x91, 0x23, 0x0b, 0x90, 0x1b, 0x5a, 0x14, 0xa5, 0xa2, 0xc7, 0x00, 0x2a, 0x08, 0x51, 0xa2, 0x08, 0x50, 0x76, 0xc9, 0xa0, 0x3a, 0xb4,
0xb7, 0xdc, 0xd3, 0x07, 0x77, 0xfb, 0xd7, 0xa6, 0xfd, 0x49, 0x10, 0xa2, 0x2d, 0xd1, 0x93, 0xca, 0xc6, 0xf7, 0x46, 0x7f, 0x42, 0x47, 0xf3, 0x20, 0xc4, 0x99, 0x66, 0x4f, 0x6a, 0x37, 0x3f, 0x0f,
0xd5, 0xaf, 0x83, 0x2d, 0xaa, 0xf0, 0xc9, 0x31, 0xd4, 0x42, 0x14, 0xae, 0xef, 0x0a, 0xd7, 0x28, 0xb6, 0x98, 0xa1, 0xa7, 0xc7, 0xd0, 0x08, 0x51, 0x71, 0x9f, 0x2b, 0xde, 0xad, 0x6a, 0x6f, 0xcf,
0x4b, 0x6d, 0x5b, 0xd5, 0x5a, 0x28, 0x58, 0xe0, 0x59, 0x39, 0x23, 0xd7, 0xaf, 0x15, 0x2f, 0x2a, 0xf4, 0x3a, 0xa8, 0x44, 0xe0, 0x39, 0x85, 0xa2, 0xf0, 0x6f, 0x1c, 0x2f, 0x6b, 0x8d, 0x4a, 0xbb,
0xb5, 0x52, 0xb3, 0x6c, 0x7e, 0xd5, 0xc0, 0xb0, 0x82, 0x28, 0x08, 0x83, 0x2f, 0xe8, 0xab, 0xd9, 0x6a, 0x7f, 0x27, 0x60, 0x31, 0xe4, 0x7e, 0x59, 0xd1, 0x21, 0xd4, 0xaf, 0x52, 0xb3, 0x9c, 0xbb,
0x6c, 0xc1, 0xc8, 0xf8, 0x86, 0x78, 0xe6, 0x46, 0x8b, 0x42, 0x79, 0x9d, 0xd3, 0x16, 0xec, 0x86, 0x66, 0xe4, 0xbb, 0x14, 0xc5, 0x8a, 0x95, 0x0a, 0xfa, 0x19, 0x3a, 0xdc, 0xf3, 0x30, 0x51, 0xe8,
0xa8, 0x06, 0x54, 0xf9, 0x22, 0x9c, 0xc6, 0x73, 0x2e, 0x93, 0xde, 0xa2, 0xc5, 0x31, 0x8f, 0xf1, 0xbb, 0x02, 0x65, 0x12, 0x47, 0x12, 0x5d, 0x3d, 0x86, 0x6e, 0x65, 0x50, 0x1d, 0xb6, 0xc6, 0x0f,
0x43, 0x03, 0x9d, 0xa2, 0xeb, 0x17, 0x83, 0x39, 0x84, 0xea, 0x45, 0xaa, 0xb6, 0xbd, 0xa3, 0xb6, 0x4c, 0xb3, 0xf1, 0x9b, 0x11, 0x2b, 0xd4, 0xf3, 0x55, 0x82, 0x6c, 0xbf, 0x0c, 0x31, 0x51, 0x69,
0x7d, 0x9b, 0x22, 0x5b, 0xd0, 0x82, 0x41, 0x3e, 0x41, 0xcb, 0xf5, 0x3c, 0x4c, 0x04, 0xfa, 0x0e, 0x1f, 0x41, 0xd3, 0x04, 0xa8, 0x05, 0xf5, 0xd9, 0xc4, 0x79, 0x7b, 0x3e, 0x9d, 0xb5, 0xb7, 0x68,
0x43, 0x9e, 0xc4, 0x11, 0x47, 0x47, 0xbe, 0x86, 0x51, 0xea, 0x96, 0x7b, 0x8d, 0xc1, 0x7d, 0x55, 0x07, 0x76, 0x67, 0x73, 0x36, 0x9d, 0x38, 0xd3, 0xe7, 0xee, 0xc7, 0x37, 0xcc, 0x3d, 0x3d, 0x7b,
0xac, 0xb4, 0xe9, 0xd3, 0x9c, 0x3d, 0x59, 0x24, 0x48, 0xf7, 0x0b, 0x13, 0xb5, 0xca, 0xcd, 0x23, 0xff, 0xfa, 0xd5, 0xac, 0x4d, 0xec, 0x49, 0xe6, 0xe2, 0x9b, 0x28, 0xfa, 0x18, 0xea, 0x02, 0x65,
0xa8, 0xab, 0x05, 0xa2, 0x43, 0xd5, 0x1e, 0x5a, 0x6f, 0xce, 0x46, 0x76, 0x73, 0x8b, 0xb4, 0x60, 0x7a, 0xa9, 0xca, 0x86, 0x3a, 0xff, 0x36, 0xa4, 0x79, 0x56, 0xea, 0xec, 0x6f, 0x04, 0xb6, 0x35,
0xd7, 0x9e, 0xd0, 0xd1, 0xd0, 0x1a, 0x3d, 0x73, 0x3e, 0xbc, 0xa6, 0xce, 0xe9, 0xf8, 0xdd, 0xab, 0x41, 0x1f, 0x02, 0x95, 0x8a, 0x0b, 0xe5, 0xea, 0xa9, 0x2b, 0x1e, 0x26, 0x6e, 0x98, 0xe5, 0x90,
0x97, 0x76, 0x53, 0x33, 0x87, 0x99, 0xca, 0x5d, 0x5b, 0x91, 0x47, 0x50, 0x65, 0xc8, 0xd3, 0xb9, 0x61, 0x95, 0xb5, 0x35, 0x33, 0x2f, 0x09, 0x47, 0xd2, 0x21, 0xb4, 0x31, 0xf2, 0x6f, 0x6b, 0x2b,
0x28, 0x2e, 0xd4, 0xfa, 0xf7, 0x42, 0x12, 0xa7, 0x05, 0xcf, 0xfc, 0xae, 0xc1, 0xb6, 0x04, 0xc8, 0x5a, 0xdb, 0xc2, 0xc8, 0x37, 0x95, 0x47, 0xd0, 0x08, 0xb9, 0xf2, 0x96, 0x28, 0x64, 0x71, 0x73,
0x03, 0x20, 0x5c, 0xb8, 0x4c, 0x38, 0x72, 0xa2, 0xc2, 0x0d, 0x13, 0x27, 0xcc, 0x7c, 0xb4, 0x5e, 0x5d, 0xb3, 0xaa, 0x73, 0xbe, 0xc0, 0x4b, 0x27, 0x17, 0xb0, 0x8d, 0x92, 0x1e, 0xc2, 0xf6, 0x32,
0x99, 0x36, 0x25, 0x32, 0x29, 0x00, 0x8b, 0x93, 0x1e, 0x34, 0x31, 0xf2, 0x37, 0xb9, 0x25, 0xc9, 0x88, 0x94, 0xec, 0xd6, 0x06, 0x64, 0x68, 0x8d, 0xf7, 0xff, 0x1e, 0xee, 0x59, 0x46, 0xb2, 0x5c,
0x6d, 0x60, 0xe4, 0xab, 0xcc, 0x23, 0xa8, 0x85, 0xae, 0xf0, 0x66, 0xc8, 0x78, 0xbe, 0x40, 0x86, 0x63, 0x4f, 0xc1, 0x32, 0x9a, 0xa3, 0x4f, 0xff, 0x7f, 0xd3, 0xcc, 0x1d, 0xb3, 0xaf, 0x61, 0xf7,
0x9a, 0xea, 0xcc, 0x9d, 0xe2, 0xdc, 0x5a, 0x11, 0xe8, 0x9a, 0x49, 0x0e, 0x61, 0x7b, 0x16, 0x44, 0x74, 0x99, 0x46, 0x5f, 0xb2, 0xcb, 0x31, 0xa6, 0xfa, 0x0c, 0x5a, 0x5e, 0x0e, 0xbb, 0xb7, 0x22,
0x82, 0x1b, 0x95, 0xae, 0xd6, 0xd3, 0x07, 0xfb, 0x7f, 0x0f, 0x77, 0x9c, 0x81, 0x74, 0xc5, 0x31, 0xef, 0x9b, 0x91, 0x85, 0xb1, 0x48, 0xbd, 0xe3, 0x99, 0x9f, 0xf4, 0x00, 0xac, 0x6c, 0x8d, 0x56,
0x47, 0xa0, 0x2b, 0x97, 0x23, 0x4f, 0xfe, 0x7f, 0xe1, 0xd5, 0xfd, 0x31, 0x2f, 0x61, 0xf7, 0x74, 0x6e, 0x10, 0xf9, 0x78, 0x5d, 0xcc, 0x09, 0x34, 0xf4, 0x22, 0x43, 0x4e, 0xf6, 0x6e, 0xd6, 0x7d,
0x96, 0x46, 0x9f, 0xb3, 0xc7, 0x51, 0xa6, 0xfa, 0x14, 0x1a, 0xde, 0xaa, 0xec, 0x6c, 0x58, 0xde, 0xf2, 0x63, 0xdd, 0x27, 0xbf, 0xd6, 0x7d, 0xf2, 0x69, 0x27, 0xcb, 0x4d, 0x16, 0x8b, 0x1d, 0xfd,
0x53, 0x2d, 0x73, 0x61, 0xee, 0x7a, 0xdb, 0x53, 0x8f, 0xe4, 0x00, 0xf4, 0x6c, 0x8d, 0x16, 0x4e, 0x92, 0x9e, 0xfc, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x13, 0x18, 0x12, 0x0a, 0x88, 0x03, 0x00, 0x00,
0x10, 0xf9, 0x78, 0x99, 0xcf, 0x09, 0x64, 0xe9, 0x79, 0x56, 0x39, 0xd9, 0xbb, 0x5a, 0x76, 0xb4,
0x9f, 0xcb, 0x8e, 0xf6, 0x7b, 0xd9, 0xd1, 0x3e, 0xee, 0x64, 0xbe, 0xc9, 0x74, 0xba, 0x23, 0x7f,
0xe8, 0xc7, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xa5, 0x67, 0x6e, 0xf2, 0x0f, 0x04, 0x00, 0x00,
} }
func (m *WriteRequest) Marshal() (dAtA []byte, err error) { func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
@ -568,56 +509,6 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *MinimizedWriteRequestStr) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MinimizedWriteRequestStr) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MinimizedWriteRequestStr) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintRemote(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0x1a
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) { func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -903,30 +794,6 @@ func (m *WriteRequest) Size() (n int) {
return n return n
} }
func (m *MinimizedWriteRequestStr) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if len(m.Symbols) > 0 {
for _, s := range m.Symbols {
l = len(s)
n += 1 + l + sovRemote(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *ReadRequest) Size() (n int) { func (m *ReadRequest) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -1162,123 +1029,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *MinimizedWriteRequestStr) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MinimizedWriteRequestStr: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MinimizedWriteRequestStr: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, MinimizedTimeSeriesStr{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Symbols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Symbols = append(m.Symbols, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadRequest) Unmarshal(dAtA []byte) error { func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0

View file

@ -27,12 +27,6 @@ message WriteRequest {
repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false]; repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
} }
message MinimizedWriteRequestStr {
repeated MinimizedTimeSeriesStr timeseries = 1 [(gogoproto.nullable) = false];
reserved 2; // TODO: should/can we move this for this new proto version?
repeated string symbols = 3;
}
// ReadRequest represents a remote read request. // ReadRequest represents a remote read request.
message ReadRequest { message ReadRequest {
repeated Query queries = 1; repeated Query queries = 1;

File diff suppressed because it is too large Load diff

View file

@ -38,22 +38,6 @@ message MetricMetadata {
string unit = 5; string unit = 5;
} }
message MinMetricMetadata {
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
MetricType type = 1;
uint32 help_ref = 3;
uint32 unit_ref = 4;
}
message Sample { message Sample {
double value = 1; double value = 1;
@ -79,14 +63,6 @@ message Exemplar {
int64 timestamp = 3; int64 timestamp = 3;
} }
message MinExemplar {
// TODO: same as TimeSeries.labels_refs
repeated uint32 labels_refs = 1;
double value = 2;
// timestamp is in ms.
int64 timestamp = 3;
}
// A native histogram, also known as a sparse histogram. // A native histogram, also known as a sparse histogram.
// Original design doc: // Original design doc:
// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit // https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
@ -153,72 +129,6 @@ message BucketSpan {
} }
// A native histogram, also known as a sparse histogram.
// Original design doc:
// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
// The appendix of this design doc also explains the concept of float
// histograms. This Histogram message can represent both, the usual
// integer histogram as well as a float histogram.
message MinHistogram {
enum ResetHint {
UNKNOWN = 0; // Need to test for a counter reset explicitly.
YES = 1; // This is the 1st histogram after a counter reset.
NO = 2; // There was no counter reset between this and the previous Histogram.
GAUGE = 3; // This is a gauge histogram where counter resets don't happen.
}
oneof count { // Count of observations in the histogram.
uint64 count_int = 1;
double count_float = 2;
}
double sum = 3; // Sum of observations in the histogram.
// The schema defines the bucket schema. Currently, valid numbers
// are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1
// is a bucket boundary in each case, and then each power of two is
// divided into 2^n logarithmic buckets. Or in other words, each
// bucket boundary is the previous boundary times 2^(2^-n). In the
// future, more bucket schemas may be added using numbers < -4 or >
// 8.
sint32 schema = 4;
double zero_threshold = 5; // Breadth of the zero bucket.
oneof zero_count { // Count in zero bucket.
uint64 zero_count_int = 6;
double zero_count_float = 7;
}
// Negative Buckets.
repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_counts = 10; // Absolute count of each bucket.
// Positive Buckets.
repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_counts = 13; // Absolute count of each bucket.
ResetHint reset_hint = 14;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 15;
}
// A BucketSpan defines a number of consecutive buckets with their
// offset. Logically, it would be more straightforward to include the
// bucket counts in the Span. However, the protobuf representation is
// more compact in the way the data is structured here (with all the
// buckets in a single array separate from the Spans).
message MinBucketSpan {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}
// TimeSeries represents samples and labels for a single time series. // TimeSeries represents samples and labels for a single time series.
message TimeSeries { message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars // For a timeseries to be valid, and for the samples and exemplars
@ -229,19 +139,6 @@ message TimeSeries {
repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
} }
message MinimizedTimeSeriesStr {
// Sorted list of label name-value pair references, encoded as indices to a strings array.
// This list's len is always multiple of 2.
repeated uint32 label_symbols = 1;
// Sorted by time, oldest sample first.
repeated MinSample samples = 2 [(gogoproto.nullable) = false];
repeated MinExemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated MinHistogram histograms = 4 [(gogoproto.nullable) = false];
// TODO: add metadata
}
message Label { message Label {
string name = 1; string name = 1;
string value = 2; string value = 2;

174
prompb/write/v2/custom.go Normal file
View file

@ -0,0 +1,174 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writev2
import (
"golang.org/x/exp/slices"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
func (m *WriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
dst = make([]byte, siz)
}
dst = dst[:siz]
n, err := m.OptimizedMarshalToSizedBuffer(dst)
if err != nil {
return nil, err
}
return (dst)[:n], nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *WriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].OptimizedMarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.Symbols) > 0 {
for iNdEx := len(m.Symbols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Symbols[iNdEx])
copy(dAtA[i:], m.Symbols[iNdEx])
i = encodeVarintTypes(dAtA, i, uint64(len(m.Symbols[iNdEx])))
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but marshals m.LabelsRefs in place without extra allocations.
func (m *TimeSeries) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.CreatedTimestamp != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.CreatedTimestamp))
i--
dAtA[i] = 0x30
}
if m.Metadata != nil {
{
size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
}
if len(m.Histograms) > 0 {
for iNdEx := len(m.Histograms) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Histograms[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
}
if len(m.Exemplars) > 0 {
for iNdEx := len(m.Exemplars) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Exemplars[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.LabelsRefs) > 0 {
// This is the trick: encode the varints in reverse order to make it easier
// to do it in place. Then reverse the whole thing.
var j10 int
start := i
for _, num := range m.LabelsRefs {
for num >= 1<<7 {
dAtA[i-1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
i--
j10++
}
dAtA[i-1] = uint8(num)
i--
j10++
}
slices.Reverse(dAtA[i:start])
// --- end of trick
i = encodeVarintTypes(dAtA, i, uint64(j10))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}

View file

@ -10,7 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package prompb package writev2
import ( import (
"testing" "testing"
@ -23,18 +23,18 @@ func TestOptimizedMarshal(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
m *MinimizedWriteRequestStr m *WriteRequest
}{ }{
// { // {
// name: "empty", // name: "empty",
// m: &MinimizedWriteRequestStr{}, // m: &WriteRequest{},
// }, // },
{ {
name: "simple", name: "simple",
m: &MinimizedWriteRequestStr{ m: &WriteRequest{
Timeseries: []MinimizedTimeSeriesStr{ Timeseries: []TimeSeries{
{ {
LabelSymbols: []uint32{ LabelsRefs: []uint32{
0, 1, 0, 1,
2, 3, 2, 3,
4, 5, 4, 5,
@ -45,12 +45,12 @@ func TestOptimizedMarshal(t *testing.T) {
14, 15, 14, 15,
}, },
Samples: []MinSample{{Value: 1, Timestamp: 0}}, Samples: []Sample{{Value: 1, Timestamp: 0}},
Exemplars: []MinExemplar{{LabelsRefs: []uint32{0, 1}, Value: 1, Timestamp: 0}}, Exemplars: []Exemplar{{LabelsRefs: []uint32{0, 1}, Value: 1, Timestamp: 0}},
Histograms: nil, Histograms: nil,
}, },
{ {
LabelSymbols: []uint32{ LabelsRefs: []uint32{
0, 1, 0, 1,
2, 3, 2, 3,
4, 5, 4, 5,
@ -60,8 +60,8 @@ func TestOptimizedMarshal(t *testing.T) {
12, 13, 12, 13,
14, 15, 14, 15,
}, },
Samples: []MinSample{{Value: 2, Timestamp: 1}}, Samples: []Sample{{Value: 2, Timestamp: 1}},
Exemplars: []MinExemplar{{LabelsRefs: []uint32{0, 1}, Value: 2, Timestamp: 1}}, Exemplars: []Exemplar{{LabelsRefs: []uint32{0, 1}, Value: 2, Timestamp: 1}},
Histograms: nil, Histograms: nil,
}, },
}, },
@ -90,7 +90,7 @@ func TestOptimizedMarshal(t *testing.T) {
require.Equal(t, expected, got) require.Equal(t, expected, got)
// round trip // round trip
m := &MinimizedWriteRequestStr{} m := &WriteRequest{}
require.NoError(t, m.Unmarshal(got)) require.NoError(t, m.Unmarshal(got))
require.Equal(t, tt.m, m) require.Equal(t, tt.m, m)
}) })

3056
prompb/write/v2/types.pb.go Normal file

File diff suppressed because it is too large Load diff

141
prompb/write/v2/types.proto Normal file
View file

@ -0,0 +1,141 @@
// Copyright 2017 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package write.v2;
option go_package = "writev2";
import "gogoproto/gogo.proto";
message WriteRequest {
repeated string symbols = 1;
repeated TimeSeries timeseries = 2 [(gogoproto.nullable) = false];
}
message TimeSeries {
// Sorted list of label name-value pair references,
// encoded as indices to the strings array.
// This list's len is always multiple of 2.
repeated uint32 labels_refs = 1 ;
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
// Same as current version
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
Metadata metadata = 5;
// Optional created timestamp for the metric in ms format,
// if the first sample in samples does not contain 0 value.
// See model/timestamp/timestamp.go for conversion from time.Time
// to Prometheus timestamp.
int64 created_timestamp = 6;
}
message Exemplar {
// TODO: same as TimeSeries.labels_refs
repeated uint32 labels_refs = 1;
double value = 2;
// timestamp is in ms.
int64 timestamp = 3;
}
message Sample {
double value = 1;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 2;
}
message Metadata {
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
MetricType type = 1;
uint32 help_ref = 3;
uint32 unit_ref = 4;
}
// A native histogram, also known as a sparse histogram.
// Original design doc:
// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
// The appendix of this design doc also explains the concept of float
// histograms. This Histogram message can represent both, the usual
// integer histogram as well as a float histogram.
message Histogram {
enum ResetHint {
UNKNOWN = 0; // Need to test for a counter reset explicitly.
YES = 1; // This is the 1st histogram after a counter reset.
NO = 2; // There was no counter reset between this and the previous Histogram.
GAUGE = 3; // This is a gauge histogram where counter resets don't happen.
}
oneof count { // Count of observations in the histogram.
uint64 count_int = 1;
double count_float = 2;
}
double sum = 3; // Sum of observations in the histogram.
// The schema defines the bucket schema. Currently, valid numbers
// are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1
// is a bucket boundary in each case, and then each power of two is
// divided into 2^n logarithmic buckets. Or in other words, each
// bucket boundary is the previous boundary times 2^(2^-n). In the
// future, more bucket schemas may be added using numbers < -4 or >
// 8.
sint32 schema = 4;
double zero_threshold = 5; // Breadth of the zero bucket.
oneof zero_count { // Count in zero bucket.
uint64 zero_count_int = 6;
double zero_count_float = 7;
}
// Negative Buckets.
repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_counts = 10; // Absolute count of each bucket.
// Positive Buckets.
repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_counts = 13; // Absolute count of each bucket.
ResetHint reset_hint = 14;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 15;
}
// A BucketSpan defines a number of consecutive buckets with their
// offset. Logically, it would be more straightforward to include the
// bucket counts in the Span. However, the protobuf representation is
// more compact in the way the data is structured here (with all the
// buckets in a single array separate from the Spans).
message BucketSpan {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}

View file

@ -40,6 +40,9 @@ for dir in ${DIRS}; do
-I="${PROM_PATH}" \ -I="${PROM_PATH}" \
-I="${GRPC_GATEWAY_ROOT}/third_party/googleapis" \ -I="${GRPC_GATEWAY_ROOT}/third_party/googleapis" \
./*.proto ./*.proto
protoc --gogofast_out=plugins=grpc:. -I=. \
-I="${GOGOPROTO_PATH}" \
./write/v2/*.proto
protoc --gogofast_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,paths=source_relative:. -I=. \ protoc --gogofast_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,paths=source_relative:. -I=. \
-I="${GOGOPROTO_PATH}" \ -I="${GOGOPROTO_PATH}" \
./io/prometheus/client/*.proto ./io/prometheus/client/*.proto

View file

@ -35,6 +35,7 @@ import (
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
@ -628,7 +629,7 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
} }
} }
func minExemplarProtoToExemplar(ep prompb.MinExemplar, symbols []string) exemplar.Exemplar { func minExemplarProtoToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar {
timestamp := ep.Timestamp timestamp := ep.Timestamp
return exemplar.Exemplar{ return exemplar.Exemplar{
@ -703,7 +704,7 @@ func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogr
} }
} }
func FloatMinHistogramProtoToFloatHistogram(hp prompb.MinHistogram) *histogram.FloatHistogram { func FloatMinHistogramProtoToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() { if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram") panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
} }
@ -714,9 +715,9 @@ func FloatMinHistogramProtoToFloatHistogram(hp prompb.MinHistogram) *histogram.F
ZeroCount: hp.GetZeroCountFloat(), ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(), Count: hp.GetCountFloat(),
Sum: hp.Sum, Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), PositiveSpans: minSpansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(), PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), NegativeSpans: minSpansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(), NegativeBuckets: hp.GetNegativeCounts(),
} }
} }
@ -724,7 +725,7 @@ func FloatMinHistogramProtoToFloatHistogram(hp prompb.MinHistogram) *histogram.F
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the // HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message // provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics. // represents an integer histogram and not a float histogram, or it panics.
func MinHistogramProtoToHistogram(hp prompb.MinHistogram) *histogram.Histogram { func MinHistogramProtoToHistogram(hp writev2.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram") panic("HistogramProtoToHistogram called with a float histogram")
} }
@ -735,9 +736,9 @@ func MinHistogramProtoToHistogram(hp prompb.MinHistogram) *histogram.Histogram {
ZeroCount: hp.GetZeroCountInt(), ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(), Count: hp.GetCountInt(),
Sum: hp.Sum, Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), PositiveSpans: minSpansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(), PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), NegativeSpans: minSpansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(), NegativeBuckets: hp.GetNegativeDeltas(),
} }
} }
@ -751,6 +752,15 @@ func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
return spans return spans
} }
func minSpansProtoToSpans(s []writev2.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
func deltasToCounts(deltas []int64) []float64 { func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas)) counts := make([]float64, len(deltas))
var cur float64 var cur float64
@ -777,18 +787,18 @@ func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.H
} }
} }
func HistogramToMinHistogramProto(timestamp int64, h *histogram.Histogram) prompb.MinHistogram { func HistogramToMinHistogramProto(timestamp int64, h *histogram.Histogram) writev2.Histogram {
return prompb.MinHistogram{ return writev2.Histogram{
Count: &prompb.MinHistogram_CountInt{CountInt: h.Count}, Count: &writev2.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum, Sum: h.Sum,
Schema: h.Schema, Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold, ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.MinHistogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, ZeroCount: &writev2.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans), NegativeSpans: spansToMinSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets, NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans), PositiveSpans: spansToMinSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets, PositiveDeltas: h.PositiveBuckets,
ResetHint: prompb.MinHistogram_ResetHint(h.CounterResetHint), ResetHint: writev2.Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp, Timestamp: timestamp,
} }
} }
@ -809,18 +819,18 @@ func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogra
} }
} }
func FloatHistogramToMinHistogramProto(timestamp int64, fh *histogram.FloatHistogram) prompb.MinHistogram { func FloatHistogramToMinHistogramProto(timestamp int64, fh *histogram.FloatHistogram) writev2.Histogram {
return prompb.MinHistogram{ return writev2.Histogram{
Count: &prompb.MinHistogram_CountFloat{CountFloat: fh.Count}, Count: &writev2.Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum, Sum: fh.Sum,
Schema: fh.Schema, Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold, ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &prompb.MinHistogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, ZeroCount: &writev2.Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans), NegativeSpans: spansToMinSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets, NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans), PositiveSpans: spansToMinSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets, PositiveCounts: fh.PositiveBuckets,
ResetHint: prompb.MinHistogram_ResetHint(fh.CounterResetHint), ResetHint: writev2.Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp, Timestamp: timestamp,
} }
} }
@ -834,6 +844,15 @@ func spansToSpansProto(s []histogram.Span) []prompb.BucketSpan {
return spans return spans
} }
func spansToMinSpansProto(s []histogram.Span) []writev2.BucketSpan {
spans := make([]writev2.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = writev2.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans
}
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric. // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric.
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
metric := make(model.Metric, len(labelPairs)) metric := make(model.Metric, len(labelPairs))
@ -980,7 +999,7 @@ func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
return otlpReq, nil return otlpReq, nil
} }
func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestStr, error) { func DecodeMinimizedWriteRequestStr(r io.Reader) (*writev2.WriteRequest, error) {
compressed, err := io.ReadAll(r) compressed, err := io.ReadAll(r)
if err != nil { if err != nil {
return nil, err return nil, err
@ -991,7 +1010,7 @@ func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestS
return nil, err return nil, err
} }
var req prompb.MinimizedWriteRequestStr var req writev2.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil { if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err return nil, err
} }
@ -999,14 +1018,14 @@ func DecodeMinimizedWriteRequestStr(r io.Reader) (*prompb.MinimizedWriteRequestS
return &req, nil return &req, nil
} }
func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequestStr) (*prompb.WriteRequest, error) { func MinimizedWriteRequestToWriteRequest(redReq *writev2.WriteRequest) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{ req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)), Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
// TODO handle metadata? // TODO handle metadata?
} }
for i, rts := range redReq.Timeseries { for i, rts := range redReq.Timeseries {
Uint32StrRefToLabels(redReq.Symbols, rts.LabelSymbols).Range(func(l labels.Label) { Uint32StrRefToLabels(redReq.Symbols, rts.LabelsRefs).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{ req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name, Name: l.Name,
Value: l.Value, Value: l.Value,
@ -1042,13 +1061,25 @@ func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequestStr
req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()} req.Timeseries[i].Histograms[j].Count = &prompb.Histogram_CountInt{CountInt: h.GetCountInt()}
req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()} req.Timeseries[i].Histograms[j].ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
} }
for _, span := range h.NegativeSpans {
req.Timeseries[i].Histograms[j].NegativeSpans = append(req.Timeseries[i].Histograms[j].NegativeSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
for _, span := range h.PositiveSpans {
req.Timeseries[i].Histograms[j].PositiveSpans = append(req.Timeseries[i].Histograms[j].PositiveSpans, prompb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
})
}
req.Timeseries[i].Histograms[j].Sum = h.Sum req.Timeseries[i].Histograms[j].Sum = h.Sum
req.Timeseries[i].Histograms[j].Schema = h.Schema req.Timeseries[i].Histograms[j].Schema = h.Schema
req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold req.Timeseries[i].Histograms[j].ZeroThreshold = h.ZeroThreshold
req.Timeseries[i].Histograms[j].NegativeSpans = h.NegativeSpans
req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas req.Timeseries[i].Histograms[j].NegativeDeltas = h.NegativeDeltas
req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts req.Timeseries[i].Histograms[j].NegativeCounts = h.NegativeCounts
req.Timeseries[i].Histograms[j].PositiveSpans = h.PositiveSpans
req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas req.Timeseries[i].Histograms[j].PositiveDeltas = h.PositiveDeltas
req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts req.Timeseries[i].Histograms[j].PositiveCounts = h.PositiveCounts
req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint) req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint)

View file

@ -26,6 +26,7 @@ import (
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
@ -75,7 +76,7 @@ var writeRequestFixture = &prompb.WriteRequest{
} }
// writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation. // writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation.
var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequestStr { var writeRequestMinimizedFixture = func() *writev2.WriteRequest {
st := newRwSymbolTable() st := newRwSymbolTable()
var labels []uint32 var labels []uint32
for _, s := range []string{ for _, s := range []string{
@ -95,19 +96,19 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequestStr {
st.RefStr(s) st.RefStr(s)
} }
return &prompb.MinimizedWriteRequestStr{ return &writev2.WriteRequest{
Timeseries: []prompb.MinimizedTimeSeriesStr{ Timeseries: []writev2.TimeSeries{
{ {
LabelSymbols: labels, LabelsRefs: labels,
Samples: []prompb.MinSample{{Value: 1, Timestamp: 0}}, Samples: []writev2.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.MinExemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}}, Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}},
Histograms: []prompb.MinHistogram{HistogramToMinHistogramProto(0, &testHistogram), FloatHistogramToMinHistogramProto(1, testHistogram.ToFloat(nil))}, Histograms: []writev2.Histogram{HistogramToMinHistogramProto(0, &testHistogram), FloatHistogramToMinHistogramProto(1, testHistogram.ToFloat(nil))},
}, },
{ {
LabelSymbols: labels, LabelsRefs: labels,
Samples: []prompb.MinSample{{Value: 2, Timestamp: 1}}, Samples: []writev2.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.MinExemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}}, Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}},
Histograms: []prompb.MinHistogram{HistogramToMinHistogramProto(2, &testHistogram), FloatHistogramToMinHistogramProto(3, testHistogram.ToFloat(nil))}, Histograms: []writev2.Histogram{HistogramToMinHistogramProto(2, &testHistogram), FloatHistogramToMinHistogramProto(3, testHistogram.ToFloat(nil))},
}, },
}, },
Symbols: st.LabelsStrings(), Symbols: st.LabelsStrings(),

View file

@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
@ -1379,9 +1380,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
pendingMinStrData := make([]prompb.MinimizedTimeSeriesStr, max) pendingMinStrData := make([]writev2.TimeSeries, max)
for i := range pendingMinStrData { for i := range pendingMinStrData {
pendingMinStrData[i].Samples = []prompb.MinSample{{}} pendingMinStrData[i].Samples = []writev2.Sample{{}}
} }
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1510,7 +1511,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin))
} }
func (s *shards) sendMinStrSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesStr, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) { func (s *shards) sendMinStrSamples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the ReducedWriteRequest with no metadata. // Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
@ -1604,7 +1605,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp
return err return err
} }
func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []prompb.MinimizedTimeSeriesStr, sendExemplars, sendNativeHistograms bool) (int, int, int) { func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1620,16 +1621,16 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelSymbols = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelSymbols) pendingData[nPending].LabelsRefs = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelsRefs)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.MinSample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, writev2.Sample{
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
nPendingSamples++ nPendingSamples++
case tExemplar: case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.MinExemplar{ pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{
LabelsRefs: labelsToUint32SliceStr(d.exemplarLabels, symbolTable, nil), // TODO: optimize, reuse slice LabelsRefs: labelsToUint32SliceStr(d.exemplarLabels, symbolTable, nil), // TODO: optimize, reuse slice
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
@ -1776,7 +1777,7 @@ func (r *rwSymbolTable) clear() {
} }
} }
func buildMinimizedWriteRequestStr(samples []prompb.MinimizedTimeSeriesStr, labels []string, pBuf, buf *[]byte) ([]byte, int64, error) { func buildMinimizedWriteRequestStr(samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte) ([]byte, int64, error) {
var highest int64 var highest int64
for _, ts := range samples { for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it. // At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
@ -1791,7 +1792,7 @@ func buildMinimizedWriteRequestStr(samples []prompb.MinimizedTimeSeriesStr, labe
} }
} }
req := &prompb.MinimizedWriteRequestStr{ req := &writev2.WriteRequest{
Symbols: labels, Symbols: labels,
Timeseries: samples, Timeseries: samples,
} }

View file

@ -42,6 +42,7 @@ import (
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
@ -831,7 +832,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
reqProto = &prompb.WriteRequest{} reqProto = &prompb.WriteRequest{}
err = proto.Unmarshal(reqBuf, reqProto) err = proto.Unmarshal(reqBuf, reqProto)
case MinStrings: case MinStrings:
var reqMin prompb.MinimizedWriteRequestStr var reqMin writev2.WriteRequest
err = proto.Unmarshal(reqBuf, &reqMin) err = proto.Unmarshal(reqBuf, &reqMin)
if err == nil { if err == nil {
reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin) reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin)
@ -1503,10 +1504,10 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
for _, tc := range testCases { for _, tc := range testCases {
symbolTable := newRwSymbolTable() symbolTable := newRwSymbolTable()
buff := make([]byte, 0) buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeriesStr, len(tc.batch)) seriesBuff := make([]writev2.TimeSeries, len(tc.batch))
for i := range seriesBuff { for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.MinSample{{}} seriesBuff[i].Samples = []writev2.Sample{{}}
seriesBuff[i].Exemplars = []prompb.MinExemplar{{}} seriesBuff[i].Exemplars = []writev2.Exemplar{{}}
} }
pBuf := []byte{} pBuf := []byte{}

View file

@ -20,6 +20,7 @@ import (
"net/http" "net/http"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@ -73,7 +74,7 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error var err error
var req *prompb.WriteRequest var req *prompb.WriteRequest
var reqMinStr *prompb.MinimizedWriteRequestStr var reqMinStr *writev2.WriteRequest
// TODO: this should eventually be done via content negotiation/looking at the header // TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat { switch h.rwFormat {
@ -204,7 +205,7 @@ func (h *writeHandler) appendSamples(app storage.Appender, ss []prompb.Sample, l
return nil return nil
} }
func (h *writeHandler) appendMinSamples(app storage.Appender, ss []prompb.MinSample, labels labels.Labels) error { func (h *writeHandler) appendMinSamples(app storage.Appender, ss []writev2.Sample, labels labels.Labels) error {
var ref storage.SeriesRef var ref storage.SeriesRef
var err error var err error
for _, s := range ss { for _, s := range ss {
@ -249,7 +250,7 @@ func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histog
return nil return nil
} }
func (h *writeHandler) appendMinHistograms(app storage.Appender, hh []prompb.MinHistogram, labels labels.Labels) error { func (h *writeHandler) appendMinHistograms(app storage.Appender, hh []writev2.Histogram, labels labels.Labels) error {
var err error var err error
for _, hp := range hh { for _, hp := range hh {
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {
@ -334,7 +335,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWriteRequestStr) (err error) { func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteRequest) (err error) {
outOfOrderExemplarErrs := 0 outOfOrderExemplarErrs := 0
app := h.appendable.Appender(ctx) app := h.appendable.Appender(ctx)
@ -347,7 +348,7 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *prompb.MinimizedWri
}() }()
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
ls := Uint32StrRefToLabels(req.Symbols, ts.LabelSymbols) ls := Uint32StrRefToLabels(req.Symbols, ts.LabelsRefs)
err := h.appendMinSamples(app, ts.Samples, ls) err := h.appendMinSamples(app, ts.Samples, ls)
if err != nil { if err != nil {