From 46943c6bda03132cf9527c8b849fad4932fa5f75 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Thu, 20 Jun 2019 17:59:23 +0100 Subject: [PATCH] remote-read: Extended remote read proto with negotiation and chunked response. Context: https://docs.google.com/document/d/1JqrU3NjM9HoGLSTPYOvR217f5HBKBiJTqikEB9UiJL0/edit# Signed-off-by: Bartek Plotka Fixed proto comments after reviews. Signed-off-by: Bartek Plotka Fixed one more comment typo. Signed-off-by: Bartek Plotka Moved response_type to accepted_response_types. Signed-off-by: Bartek Plotka Fixed typo in comment. Signed-off-by: Bartek Plotka Addressed comments. Signed-off-by: Bartek Plotka Added requirements for ChunkedSeries fields. Signed-off-by: Bartek Plotka --- prompb/remote.pb.go | 431 ++++++++++++++++++++++++++++-- prompb/remote.proto | 33 +++ prompb/types.pb.go | 620 ++++++++++++++++++++++++++++++++++++++++++-- prompb/types.proto | 23 ++ web/api/v1/api.go | 6 + 5 files changed, 1060 insertions(+), 53 deletions(-) diff --git a/prompb/remote.pb.go b/prompb/remote.pb.go index b8608b5156..897b419574 100644 --- a/prompb/remote.pb.go +++ b/prompb/remote.pb.go @@ -23,6 +23,33 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package +type ReadRequest_ResponseType int32 + +const ( + // Server will stream a varint delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series. + // + // Response headers: + // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" + // Content-Encoding: "" + ReadRequest_STREAMED_XOR_CHUNKS ReadRequest_ResponseType = 0 +) + +var ReadRequest_ResponseType_name = map[int32]string{ + 0: "STREAMED_XOR_CHUNKS", +} + +var ReadRequest_ResponseType_value = map[string]int32{ + "STREAMED_XOR_CHUNKS": 0, +} + +func (x ReadRequest_ResponseType) String() string { + return proto.EnumName(ReadRequest_ResponseType_name, int32(x)) +} + +func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_eefc82927d57d89b, []int{1, 0} +} + type WriteRequest struct { Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -70,11 +97,22 @@ func (m *WriteRequest) GetTimeseries() []TimeSeries { return nil } +// ReadRequest represents a remote read request. type ReadRequest struct { - Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` + // accepted_response_types allows negotiating the content type of the response. + // + // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is + // implemented by server, error is returned. + // For request that do not support `accepted_response_types` field the non streamed, raw samples response is used. + // In such case response headers are as follows: + // + // Content-Type: "application/x-protobuf" + // Content-Encoding: "snappy" + AcceptedResponseTypes []ReadRequest_ResponseType `protobuf:"varint,2,rep,packed,name=accepted_response_types,json=acceptedResponseTypes,proto3,enum=prometheus.ReadRequest_ResponseType" json:"accepted_response_types,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ReadRequest) Reset() { *m = ReadRequest{} } @@ -117,6 +155,14 @@ func (m *ReadRequest) GetQueries() []*Query { return nil } +func (m *ReadRequest) GetAcceptedResponseTypes() []ReadRequest_ResponseType { + if m != nil { + return m.AcceptedResponseTypes + } + return nil +} + +// ReadResponse is a response when response_type equals SAMPLES. type ReadResponse struct { // In same order as the request's queries. Results []*QueryResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` @@ -284,39 +330,109 @@ func (m *QueryResult) GetTimeseries() []*TimeSeries { return nil } +// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. +// We strictly stream full series after series, optionally split by time. This means that a single frame can contain +// partition of the single series, but once a new series is started to be streamed it means that no more chunks will +// be sent for previous one. +type ChunkedReadResponse struct { + ChunkedSeries []*ChunkedSeries `protobuf:"bytes,1,rep,name=chunked_series,json=chunkedSeries,proto3" json:"chunked_series,omitempty"` + // query_index represents an index of the query from ReadRequest.queries this results relates to. + QueryIndex int64 `protobuf:"varint,2,opt,name=query_index,json=queryIndex,proto3" json:"query_index,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ChunkedReadResponse) Reset() { *m = ChunkedReadResponse{} } +func (m *ChunkedReadResponse) String() string { return proto.CompactTextString(m) } +func (*ChunkedReadResponse) ProtoMessage() {} +func (*ChunkedReadResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_eefc82927d57d89b, []int{5} +} +func (m *ChunkedReadResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ChunkedReadResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ChunkedReadResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ChunkedReadResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ChunkedReadResponse.Merge(m, src) +} +func (m *ChunkedReadResponse) XXX_Size() int { + return m.Size() +} +func (m *ChunkedReadResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ChunkedReadResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ChunkedReadResponse proto.InternalMessageInfo + +func (m *ChunkedReadResponse) GetChunkedSeries() []*ChunkedSeries { + if m != nil { + return m.ChunkedSeries + } + return nil +} + +func (m *ChunkedReadResponse) GetQueryIndex() int64 { + if m != nil { + return m.QueryIndex + } + return 0 +} + func init() { + proto.RegisterEnum("prometheus.ReadRequest_ResponseType", ReadRequest_ResponseType_name, ReadRequest_ResponseType_value) proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*Query)(nil), "prometheus.Query") proto.RegisterType((*QueryResult)(nil), "prometheus.QueryResult") + proto.RegisterType((*ChunkedReadResponse)(nil), "prometheus.ChunkedReadResponse") } func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) } var fileDescriptor_eefc82927d57d89b = []byte{ - // 333 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xbf, 0x4a, 0x2b, 0x41, - 0x14, 0xc6, 0xef, 0xdc, 0xfc, 0xbb, 0x9c, 0x0d, 0x97, 0xdc, 0x21, 0x57, 0x97, 0x14, 0x31, 0x6c, - 0xb5, 0x10, 0x89, 0x18, 0xc5, 0x42, 0x6c, 0x0c, 0x08, 0x16, 0x49, 0xe1, 0x18, 0x10, 0x6c, 0xc2, - 0xc6, 0x1c, 0x92, 0x85, 0xcc, 0xce, 0x66, 0xe6, 0x6c, 0x91, 0xd7, 0xb3, 0x4a, 0xe9, 0x13, 0x88, - 0xe4, 0x49, 0x64, 0x67, 0x59, 0x1d, 0xb1, 0xb1, 0x1b, 0xe6, 0xf7, 0xfb, 0x3e, 0xce, 0xe1, 0x40, - 0x53, 0xa3, 0x54, 0x84, 0x83, 0x54, 0x2b, 0x52, 0x1c, 0x52, 0xad, 0x24, 0xd2, 0x0a, 0x33, 0xd3, - 0xf1, 0x68, 0x9b, 0xa2, 0x29, 0x40, 0xa7, 0xbd, 0x54, 0x4b, 0x65, 0x9f, 0x27, 0xf9, 0xab, 0xf8, - 0x0d, 0xc6, 0xd0, 0x7c, 0xd0, 0x31, 0xa1, 0xc0, 0x4d, 0x86, 0x86, 0xf8, 0x15, 0x00, 0xc5, 0x12, - 0x0d, 0xea, 0x18, 0x8d, 0xcf, 0x7a, 0x95, 0xd0, 0x1b, 0x1e, 0x0c, 0x3e, 0x3b, 0x07, 0xd3, 0x58, - 0xe2, 0xbd, 0xa5, 0xa3, 0xea, 0xee, 0xf5, 0xe8, 0x97, 0x70, 0xfc, 0xe0, 0x12, 0x3c, 0x81, 0xd1, - 0xa2, 0x2c, 0xeb, 0x43, 0x63, 0x93, 0xb9, 0x4d, 0xff, 0xdc, 0xa6, 0xbb, 0x0c, 0xf5, 0x56, 0x94, - 0x46, 0x70, 0x0d, 0xcd, 0x22, 0x6b, 0x52, 0x95, 0x18, 0xe4, 0xa7, 0xd0, 0xd0, 0x68, 0xb2, 0x35, - 0x95, 0xe1, 0xc3, 0xef, 0x61, 0xcb, 0x45, 0xe9, 0x05, 0xcf, 0x0c, 0x6a, 0x16, 0xf0, 0x63, 0xe0, - 0x86, 0x22, 0x4d, 0x33, 0x3b, 0x1c, 0x45, 0x32, 0x9d, 0xc9, 0xbc, 0x87, 0x85, 0x15, 0xd1, 0xb2, - 0x64, 0x5a, 0x82, 0x89, 0xe1, 0x21, 0xb4, 0x30, 0x59, 0x7c, 0x75, 0x7f, 0x5b, 0xf7, 0x2f, 0x26, - 0x0b, 0xd7, 0x3c, 0x87, 0x3f, 0x32, 0xa2, 0xa7, 0x15, 0x6a, 0xe3, 0x57, 0xec, 0x54, 0xbe, 0x3b, - 0xd5, 0x38, 0x9a, 0xe3, 0x7a, 0x52, 0x08, 0xe2, 0xc3, 0xe4, 0x7d, 0xa8, 0xad, 0xe2, 0x84, 0x8c, - 0x5f, 0xed, 0xb1, 0xd0, 0x1b, 0xfe, 0x77, 0x23, 0xf9, 0xce, 0xb7, 0x39, 0x14, 0x85, 0x13, 0xdc, - 0x80, 0xe7, 0x2c, 0xc7, 0x2f, 0x7e, 0x7e, 0x10, 0xf7, 0x14, 0xa3, 0xf6, 0x6e, 0xdf, 0x65, 0x2f, - 0xfb, 0x2e, 0x7b, 0xdb, 0x77, 0xd9, 0x63, 0x3d, 0x0f, 0xa4, 0xf3, 0x79, 0xdd, 0x5e, 0xfd, 0xec, - 0x3d, 0x00, 0x00, 0xff, 0xff, 0x9e, 0xb6, 0x05, 0x1c, 0x34, 0x02, 0x00, 0x00, + // 454 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xdf, 0x6e, 0xd3, 0x30, + 0x14, 0xc6, 0xe7, 0x75, 0x7f, 0xd0, 0x49, 0xa9, 0x8a, 0xb7, 0xd1, 0xb0, 0x8b, 0xae, 0x8a, 0x90, + 0x88, 0x34, 0x54, 0x44, 0x41, 0x5c, 0x71, 0xc1, 0x36, 0x2a, 0x0d, 0xb1, 0x82, 0x70, 0x8b, 0x40, + 0x08, 0xc9, 0x4a, 0x93, 0xa3, 0x35, 0x62, 0x49, 0x3c, 0xdb, 0x91, 0xd6, 0xc7, 0x83, 0xab, 0x5d, + 0xf2, 0x04, 0x08, 0xf5, 0x49, 0x90, 0x9d, 0x66, 0xb8, 0x70, 0xb3, 0x3b, 0xe7, 0xfb, 0x7e, 0xe7, + 0x8b, 0xcf, 0xf1, 0x81, 0xa6, 0xc4, 0xac, 0xd0, 0xd8, 0x17, 0xb2, 0xd0, 0x05, 0x05, 0x21, 0x8b, + 0x0c, 0xf5, 0x0c, 0x4b, 0xb5, 0xef, 0xe9, 0xb9, 0x40, 0x55, 0x19, 0xfb, 0xbb, 0xe7, 0xc5, 0x79, + 0x61, 0x8f, 0x4f, 0xcc, 0xa9, 0x52, 0x83, 0x33, 0x68, 0x7e, 0x92, 0xa9, 0x46, 0x86, 0x97, 0x25, + 0x2a, 0x4d, 0x5f, 0x02, 0xe8, 0x34, 0x43, 0x85, 0x32, 0x45, 0xe5, 0x93, 0x5e, 0x23, 0xf4, 0x06, + 0xf7, 0xfb, 0x7f, 0x33, 0xfb, 0x93, 0x34, 0xc3, 0xb1, 0x75, 0x8f, 0x37, 0xae, 0x7f, 0x1d, 0xac, + 0x31, 0x87, 0x0f, 0xbe, 0x13, 0xf0, 0x18, 0x46, 0x49, 0x9d, 0x76, 0x08, 0xdb, 0x97, 0xa5, 0x1b, + 0x75, 0xcf, 0x8d, 0xfa, 0x50, 0xa2, 0x9c, 0xb3, 0x9a, 0xa0, 0x5f, 0xa1, 0x13, 0xc5, 0x31, 0x0a, + 0x8d, 0x09, 0x97, 0xa8, 0x44, 0x91, 0x2b, 0xe4, 0xb6, 0x03, 0x7f, 0xbd, 0xd7, 0x08, 0x5b, 0x83, + 0x87, 0x6e, 0xb1, 0xf3, 0x9b, 0x3e, 0x5b, 0xd2, 0x93, 0xb9, 0x40, 0xb6, 0x57, 0x87, 0xb8, 0xaa, + 0x0a, 0x1e, 0x41, 0xd3, 0x15, 0x68, 0x07, 0x76, 0xc6, 0x13, 0x36, 0x3c, 0x1a, 0x0d, 0x5f, 0xf3, + 0xcf, 0xef, 0x19, 0x3f, 0x39, 0xfd, 0xf8, 0xee, 0xed, 0xb8, 0xbd, 0x16, 0x1c, 0x19, 0x30, 0xba, + 0xa9, 0xa6, 0x4f, 0x61, 0x5b, 0xa2, 0x2a, 0x2f, 0x74, 0xdd, 0x43, 0xe7, 0xff, 0x1e, 0xac, 0xcf, + 0x6a, 0x2e, 0xf8, 0x41, 0x60, 0xd3, 0x1a, 0xf4, 0x31, 0x50, 0xa5, 0x23, 0xa9, 0xb9, 0x1d, 0x92, + 0x8e, 0x32, 0xc1, 0x33, 0x93, 0x43, 0xc2, 0x06, 0x6b, 0x5b, 0x67, 0x52, 0x1b, 0x23, 0x45, 0x43, + 0x68, 0x63, 0x9e, 0xac, 0xb2, 0xeb, 0x96, 0x6d, 0x61, 0x9e, 0xb8, 0xe4, 0x73, 0xb8, 0x93, 0x45, + 0x3a, 0x9e, 0xa1, 0x54, 0x7e, 0xc3, 0xde, 0xca, 0x77, 0x6f, 0x75, 0x16, 0x4d, 0xf1, 0x62, 0x54, + 0x01, 0xec, 0x86, 0xa4, 0x87, 0xb0, 0x39, 0x4b, 0x73, 0xad, 0xfc, 0x8d, 0x1e, 0x09, 0xbd, 0xc1, + 0xde, 0xbf, 0xf3, 0x3c, 0x35, 0x26, 0xab, 0x98, 0x60, 0x08, 0x9e, 0xd3, 0x1c, 0x7d, 0x71, 0xfb, + 0xc5, 0x58, 0x59, 0x89, 0x2b, 0xd8, 0x39, 0x99, 0x95, 0xf9, 0x37, 0xf3, 0x1e, 0xce, 0x54, 0x5f, + 0x41, 0x2b, 0xae, 0x64, 0xbe, 0x12, 0xf9, 0xc0, 0x8d, 0x5c, 0x16, 0x2e, 0x53, 0xef, 0xc6, 0xee, + 0x27, 0x3d, 0x00, 0xcf, 0x6c, 0xce, 0x9c, 0xa7, 0x79, 0x82, 0x57, 0xcb, 0x39, 0x81, 0x95, 0xde, + 0x18, 0xe5, 0x78, 0xf7, 0x7a, 0xd1, 0x25, 0x3f, 0x17, 0x5d, 0xf2, 0x7b, 0xd1, 0x25, 0x5f, 0xb6, + 0x4c, 0xae, 0x98, 0x4e, 0xb7, 0xec, 0xde, 0x3f, 0xfb, 0x13, 0x00, 0x00, 0xff, 0xff, 0x94, 0xd0, + 0x75, 0xe3, 0x36, 0x03, 0x00, 0x00, } func (m *WriteRequest) Marshal() (dAtA []byte, err error) { @@ -379,6 +495,23 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { i += n } } + if len(m.AcceptedResponseTypes) > 0 { + dAtA2 := make([]byte, len(m.AcceptedResponseTypes)*10) + var j1 int + for _, num := range m.AcceptedResponseTypes { + for num >= 1<<7 { + dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j1++ + } + dAtA2[j1] = uint8(num) + j1++ + } + dAtA[i] = 0x12 + i++ + i = encodeVarintRemote(dAtA, i, uint64(j1)) + i += copy(dAtA[i:], dAtA2[:j1]) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -459,11 +592,11 @@ func (m *Query) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x22 i++ i = encodeVarintRemote(dAtA, i, uint64(m.Hints.Size())) - n1, err := m.Hints.MarshalTo(dAtA[i:]) + n3, err := m.Hints.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n1 + i += n3 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -504,6 +637,44 @@ func (m *QueryResult) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *ChunkedReadResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ChunkedReadResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ChunkedSeries) > 0 { + for _, msg := range m.ChunkedSeries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.QueryIndex != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.QueryIndex)) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -543,6 +714,13 @@ func (m *ReadRequest) Size() (n int) { n += 1 + l + sovRemote(uint64(l)) } } + if len(m.AcceptedResponseTypes) > 0 { + l = 0 + for _, e := range m.AcceptedResponseTypes { + l += sovRemote(uint64(e)) + } + n += 1 + sovRemote(uint64(l)) + l + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -613,6 +791,27 @@ func (m *QueryResult) Size() (n int) { return n } +func (m *ChunkedReadResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.ChunkedSeries) > 0 { + for _, e := range m.ChunkedSeries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + if m.QueryIndex != 0 { + n += 1 + sovRemote(uint64(m.QueryIndex)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovRemote(x uint64) (n int) { for { n++ @@ -777,6 +976,75 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType == 0 { + var v ReadRequest_ResponseType + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= ReadRequest_ResponseType(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.AcceptedResponseTypes = append(m.AcceptedResponseTypes, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthRemote + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + if elementCount != 0 && len(m.AcceptedResponseTypes) == 0 { + m.AcceptedResponseTypes = make([]ReadRequest_ResponseType, 0, elementCount) + } + for iNdEx < postIndex { + var v ReadRequest_ResponseType + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= ReadRequest_ResponseType(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.AcceptedResponseTypes = append(m.AcceptedResponseTypes, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field AcceptedResponseTypes", wireType) + } default: iNdEx = preIndex skippy, err := skipRemote(dAtA[iNdEx:]) @@ -1140,6 +1408,113 @@ func (m *QueryResult) Unmarshal(dAtA []byte) error { } return nil } +func (m *ChunkedReadResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ChunkedReadResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ChunkedReadResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ChunkedSeries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRemote + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ChunkedSeries = append(m.ChunkedSeries, &ChunkedSeries{}) + if err := m.ChunkedSeries[len(m.ChunkedSeries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field QueryIndex", wireType) + } + m.QueryIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.QueryIndex |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRemote(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/prompb/remote.proto b/prompb/remote.proto index cf86f0dd5a..10c9c2c0a6 100644 --- a/prompb/remote.proto +++ b/prompb/remote.proto @@ -23,10 +23,32 @@ message WriteRequest { repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; } +// ReadRequest represents a remote read request. message ReadRequest { repeated Query queries = 1; + + enum ResponseType { + // Server will stream a varint delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series. + // + // Response headers: + // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" + // Content-Encoding: "" + STREAMED_XOR_CHUNKS = 0; + } + + // accepted_response_types allows negotiating the content type of the response. + // + // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is + // implemented by server, error is returned. + // For request that do not support `accepted_response_types` field the non streamed, raw samples response is used. + // In such case response headers are as follows: + // + // Content-Type: "application/x-protobuf" + // Content-Encoding: "snappy" + repeated ResponseType accepted_response_types = 2; } +// ReadResponse is a response when response_type equals SAMPLES. message ReadResponse { // In same order as the request's queries. repeated QueryResult results = 1; @@ -43,3 +65,14 @@ message QueryResult { // Samples within a time series must be ordered by time. repeated prometheus.TimeSeries timeseries = 1; } + +// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. +// We strictly stream full series after series, optionally split by time. This means that a single frame can contain +// partition of the single series, but once a new series is started to be streamed it means that no more chunks will +// be sent for previous one. +message ChunkedReadResponse { + repeated prometheus.ChunkedSeries chunked_series = 1; + + // query_index represents an index of the query from ReadRequest.queries this results relates to. + int64 query_index = 2; +} diff --git a/prompb/types.pb.go b/prompb/types.pb.go index 65de3e70b7..6107513faf 100644 --- a/prompb/types.pb.go +++ b/prompb/types.pb.go @@ -55,6 +55,32 @@ func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor_d938547f84707355, []int{4, 0} } +// We require this to match chunkenc.Encoding. +type Chunk_Encoding int32 + +const ( + Chunk_UNKNOWN Chunk_Encoding = 0 + Chunk_XOR Chunk_Encoding = 1 +) + +var Chunk_Encoding_name = map[int32]string{ + 0: "UNKNOWN", + 1: "XOR", +} + +var Chunk_Encoding_value = map[string]int32{ + "UNKNOWN": 0, + "XOR": 1, +} + +func (x Chunk_Encoding) String() string { + return proto.EnumName(Chunk_Encoding_name, int32(x)) +} + +func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{6, 0} +} + type Sample struct { Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` @@ -110,6 +136,7 @@ func (m *Sample) GetTimestamp() int64 { return 0 } +// TimeSeries represents samples and labels for a single time series. type TimeSeries struct { Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` @@ -402,44 +429,184 @@ func (m *ReadHints) GetEndMs() int64 { return 0 } +// Chunk represents a TSDB chunk. +type Chunk struct { + MinTimeMs int64 `protobuf:"varint,1,opt,name=min_time_ms,json=minTimeMs,proto3" json:"min_time_ms,omitempty"` + MaxTimeMs int64 `protobuf:"varint,2,opt,name=max_time_ms,json=maxTimeMs,proto3" json:"max_time_ms,omitempty"` + Type Chunk_Encoding `protobuf:"varint,3,opt,name=type,proto3,enum=prometheus.Chunk_Encoding" json:"type,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Chunk) Reset() { *m = Chunk{} } +func (m *Chunk) String() string { return proto.CompactTextString(m) } +func (*Chunk) ProtoMessage() {} +func (*Chunk) Descriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{6} +} +func (m *Chunk) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Chunk.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Chunk) XXX_Merge(src proto.Message) { + xxx_messageInfo_Chunk.Merge(m, src) +} +func (m *Chunk) XXX_Size() int { + return m.Size() +} +func (m *Chunk) XXX_DiscardUnknown() { + xxx_messageInfo_Chunk.DiscardUnknown(m) +} + +var xxx_messageInfo_Chunk proto.InternalMessageInfo + +func (m *Chunk) GetMinTimeMs() int64 { + if m != nil { + return m.MinTimeMs + } + return 0 +} + +func (m *Chunk) GetMaxTimeMs() int64 { + if m != nil { + return m.MaxTimeMs + } + return 0 +} + +func (m *Chunk) GetType() Chunk_Encoding { + if m != nil { + return m.Type + } + return Chunk_UNKNOWN +} + +func (m *Chunk) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +// ChunkedSeries represents single, encoded time series. +type ChunkedSeries struct { + // labels should be sorted. + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + // chunks should be sorted should not overlap in time. + Chunks []Chunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ChunkedSeries) Reset() { *m = ChunkedSeries{} } +func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) } +func (*ChunkedSeries) ProtoMessage() {} +func (*ChunkedSeries) Descriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{7} +} +func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ChunkedSeries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ChunkedSeries.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ChunkedSeries) XXX_Merge(src proto.Message) { + xxx_messageInfo_ChunkedSeries.Merge(m, src) +} +func (m *ChunkedSeries) XXX_Size() int { + return m.Size() +} +func (m *ChunkedSeries) XXX_DiscardUnknown() { + xxx_messageInfo_ChunkedSeries.DiscardUnknown(m) +} + +var xxx_messageInfo_ChunkedSeries proto.InternalMessageInfo + +func (m *ChunkedSeries) GetLabels() []Label { + if m != nil { + return m.Labels + } + return nil +} + +func (m *ChunkedSeries) GetChunks() []Chunk { + if m != nil { + return m.Chunks + } + return nil +} + func init() { proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) + proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value) proto.RegisterType((*Sample)(nil), "prometheus.Sample") proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") proto.RegisterType((*Label)(nil), "prometheus.Label") proto.RegisterType((*Labels)(nil), "prometheus.Labels") proto.RegisterType((*LabelMatcher)(nil), "prometheus.LabelMatcher") proto.RegisterType((*ReadHints)(nil), "prometheus.ReadHints") + proto.RegisterType((*Chunk)(nil), "prometheus.Chunk") + proto.RegisterType((*ChunkedSeries)(nil), "prometheus.ChunkedSeries") } func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } var fileDescriptor_d938547f84707355 = []byte{ - // 379 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x4f, 0xef, 0xd2, 0x40, - 0x14, 0x64, 0xdb, 0xb2, 0x95, 0x87, 0x31, 0x75, 0x83, 0xb1, 0x1a, 0x45, 0xd2, 0x53, 0x4f, 0x25, - 0xe0, 0xc9, 0xc4, 0x13, 0x49, 0x13, 0x0f, 0xd4, 0x84, 0x85, 0x93, 0x17, 0xb3, 0xc0, 0x13, 0x6a, - 0xfa, 0x67, 0xed, 0x2e, 0x26, 0x7c, 0x10, 0xbf, 0x13, 0x47, 0x3f, 0x81, 0x31, 0x7c, 0x12, 0xb3, - 0x5b, 0x10, 0x12, 0xbd, 0xfc, 0x6e, 0x6f, 0xe6, 0xcd, 0x74, 0xe6, 0x35, 0x0b, 0x7d, 0x7d, 0x94, - 0xa8, 0x12, 0xd9, 0xd4, 0xba, 0x66, 0x20, 0x9b, 0xba, 0x44, 0xbd, 0xc7, 0x83, 0x7a, 0x39, 0xd8, - 0xd5, 0xbb, 0xda, 0xd2, 0x63, 0x33, 0xb5, 0x8a, 0xe8, 0x3d, 0xd0, 0xa5, 0x28, 0x65, 0x81, 0x6c, - 0x00, 0xdd, 0xef, 0xa2, 0x38, 0x60, 0x48, 0x46, 0x24, 0x26, 0xbc, 0x05, 0xec, 0x15, 0xf4, 0x74, - 0x5e, 0xa2, 0xd2, 0xa2, 0x94, 0xa1, 0x33, 0x22, 0xb1, 0xcb, 0x6f, 0x44, 0xf4, 0x0d, 0x60, 0x95, - 0x97, 0xb8, 0xc4, 0x26, 0x47, 0xc5, 0xc6, 0x40, 0x0b, 0xb1, 0xc6, 0x42, 0x85, 0x64, 0xe4, 0xc6, - 0xfd, 0xe9, 0xd3, 0xe4, 0x16, 0x9f, 0xcc, 0xcd, 0x66, 0xe6, 0x9d, 0x7e, 0xbd, 0xe9, 0xf0, 0x8b, - 0x8c, 0x4d, 0xc1, 0x57, 0x36, 0x5c, 0x85, 0x8e, 0x75, 0xb0, 0x7b, 0x47, 0xdb, 0xeb, 0x62, 0xb9, - 0x0a, 0xa3, 0x09, 0x74, 0xed, 0xa7, 0x18, 0x03, 0xaf, 0x12, 0x65, 0x5b, 0xb7, 0xc7, 0xed, 0x7c, - 0xbb, 0xc1, 0xb1, 0x64, 0x0b, 0xa2, 0x77, 0x40, 0xe7, 0x6d, 0xe0, 0x43, 0x1b, 0x46, 0x3f, 0x08, - 0x3c, 0xb6, 0x7c, 0x26, 0xf4, 0x66, 0x8f, 0x0d, 0x9b, 0x80, 0x67, 0x7e, 0xb0, 0x4d, 0x7d, 0x32, - 0x7d, 0xfd, 0x8f, 0xff, 0xa2, 0x4b, 0x56, 0x47, 0x89, 0xdc, 0x4a, 0xff, 0x16, 0x75, 0xfe, 0x57, - 0xd4, 0xbd, 0x2f, 0x1a, 0x83, 0x67, 0x7c, 0x8c, 0x82, 0x93, 0x2e, 0x82, 0x0e, 0xf3, 0xc1, 0xfd, - 0x98, 0x2e, 0x02, 0x62, 0x08, 0x9e, 0x06, 0x8e, 0x25, 0x78, 0x1a, 0xb8, 0xd1, 0x57, 0xe8, 0x71, - 0x14, 0xdb, 0x0f, 0x79, 0xa5, 0x15, 0x7b, 0x0e, 0xbe, 0xd2, 0x28, 0x3f, 0x97, 0xca, 0xd6, 0x72, - 0x39, 0x35, 0x30, 0x53, 0x26, 0xf9, 0xcb, 0xa1, 0xda, 0x5c, 0x93, 0xcd, 0xcc, 0x5e, 0xc0, 0x23, - 0xa5, 0x45, 0xa3, 0x8d, 0xda, 0xb5, 0x6a, 0xdf, 0xe2, 0x4c, 0xb1, 0x67, 0x40, 0xb1, 0xda, 0x9a, - 0x85, 0x67, 0x17, 0x5d, 0xac, 0xb6, 0x99, 0x9a, 0x0d, 0x4e, 0xe7, 0x21, 0xf9, 0x79, 0x1e, 0x92, - 0xdf, 0xe7, 0x21, 0xf9, 0x44, 0xcd, 0xc5, 0x72, 0xbd, 0xa6, 0xf6, 0xfd, 0xbc, 0xfd, 0x13, 0x00, - 0x00, 0xff, 0xff, 0xe3, 0x8a, 0x88, 0x84, 0x70, 0x02, 0x00, 0x00, + // 496 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xcd, 0x6e, 0xd3, 0x4c, + 0x14, 0xed, 0xd8, 0x89, 0xd3, 0xdc, 0xf4, 0xfb, 0xe4, 0x8e, 0x82, 0x08, 0x15, 0x84, 0xc8, 0xab, + 0xac, 0x1c, 0x35, 0xac, 0x90, 0x58, 0x15, 0x59, 0x42, 0xa2, 0x4e, 0xd5, 0x69, 0x11, 0x88, 0x4d, + 0x35, 0x89, 0x87, 0xc4, 0x90, 0x19, 0xbb, 0x9e, 0x09, 0x6a, 0x1f, 0x84, 0xc7, 0xe0, 0x3d, 0xba, + 0xe4, 0x09, 0x10, 0xca, 0x93, 0xa0, 0xb9, 0xb6, 0xeb, 0x48, 0x65, 0x03, 0xbb, 0xfb, 0x73, 0xce, + 0x9c, 0x93, 0x93, 0x6b, 0xe8, 0x99, 0xdb, 0x5c, 0xe8, 0x30, 0x2f, 0x32, 0x93, 0x51, 0xc8, 0x8b, + 0x4c, 0x0a, 0xb3, 0x12, 0x1b, 0x7d, 0xd4, 0x5f, 0x66, 0xcb, 0x0c, 0xc7, 0x13, 0x5b, 0x95, 0x88, + 0xe0, 0x15, 0x78, 0x17, 0x5c, 0xe6, 0x6b, 0x41, 0xfb, 0xd0, 0xfe, 0xca, 0xd7, 0x1b, 0x31, 0x20, + 0x23, 0x32, 0x26, 0xac, 0x6c, 0xe8, 0x53, 0xe8, 0x9a, 0x54, 0x0a, 0x6d, 0xb8, 0xcc, 0x07, 0xce, + 0x88, 0x8c, 0x5d, 0xd6, 0x0c, 0x82, 0x6b, 0x80, 0xcb, 0x54, 0x8a, 0x0b, 0x51, 0xa4, 0x42, 0xd3, + 0x09, 0x78, 0x6b, 0x3e, 0x17, 0x6b, 0x3d, 0x20, 0x23, 0x77, 0xdc, 0x9b, 0x1e, 0x86, 0x8d, 0x7c, + 0x78, 0x6a, 0x37, 0x27, 0xad, 0xbb, 0x9f, 0xcf, 0xf7, 0x58, 0x05, 0xa3, 0x53, 0xe8, 0x68, 0x14, + 0xd7, 0x03, 0x07, 0x19, 0x74, 0x97, 0x51, 0xfa, 0xaa, 0x28, 0x35, 0x30, 0x38, 0x86, 0x36, 0x3e, + 0x45, 0x29, 0xb4, 0x14, 0x97, 0xa5, 0xdd, 0x2e, 0xc3, 0xba, 0xf9, 0x0d, 0x0e, 0x0e, 0xcb, 0x26, + 0x78, 0x09, 0xde, 0x69, 0x29, 0xf8, 0xb7, 0x0e, 0x83, 0x6f, 0x04, 0x0e, 0x70, 0x1e, 0x73, 0xb3, + 0x58, 0x89, 0x82, 0x1e, 0x43, 0xcb, 0x06, 0x8c, 0xaa, 0xff, 0x4f, 0x9f, 0x3d, 0xe0, 0x57, 0xb8, + 0xf0, 0xf2, 0x36, 0x17, 0x0c, 0xa1, 0xf7, 0x46, 0x9d, 0x3f, 0x19, 0x75, 0x77, 0x8d, 0x8e, 0xa1, + 0x65, 0x79, 0xd4, 0x03, 0x27, 0x3a, 0xf7, 0xf7, 0x68, 0x07, 0xdc, 0x59, 0x74, 0xee, 0x13, 0x3b, + 0x60, 0x91, 0xef, 0xe0, 0x80, 0x45, 0xbe, 0x1b, 0x7c, 0x86, 0x2e, 0x13, 0x3c, 0x79, 0x93, 0x2a, + 0xa3, 0xe9, 0x63, 0xe8, 0x68, 0x23, 0xf2, 0x2b, 0xa9, 0xd1, 0x96, 0xcb, 0x3c, 0xdb, 0xc6, 0xda, + 0x2a, 0x7f, 0xda, 0xa8, 0x45, 0xad, 0x6c, 0x6b, 0xfa, 0x04, 0xf6, 0xb5, 0xe1, 0x85, 0xb1, 0x68, + 0x17, 0xd1, 0x1d, 0xec, 0x63, 0x4d, 0x1f, 0x81, 0x27, 0x54, 0x62, 0x17, 0x2d, 0x5c, 0xb4, 0x85, + 0x4a, 0x62, 0x1d, 0x7c, 0x27, 0xd0, 0x7e, 0xbd, 0xda, 0xa8, 0x2f, 0x74, 0x08, 0x3d, 0x99, 0xaa, + 0x2b, 0xfb, 0xff, 0x37, 0x62, 0x5d, 0x99, 0x2a, 0x7b, 0x04, 0xb1, 0xc6, 0x3d, 0xbf, 0xb9, 0xdf, + 0x57, 0xe7, 0x22, 0xf9, 0x4d, 0xb5, 0x0f, 0xab, 0xf0, 0x5c, 0x0c, 0xef, 0x68, 0x37, 0x3c, 0x14, + 0x08, 0x23, 0xb5, 0xc8, 0x92, 0x54, 0x2d, 0x9b, 0xe4, 0x12, 0x6e, 0x38, 0xda, 0x39, 0x60, 0x58, + 0x07, 0x23, 0xd8, 0xaf, 0x51, 0xb4, 0x07, 0x9d, 0x77, 0xb3, 0xb7, 0xb3, 0xb3, 0xf7, 0xb3, 0x32, + 0xac, 0x0f, 0x67, 0xcc, 0x27, 0xc1, 0x35, 0xfc, 0x87, 0xaf, 0x89, 0xe4, 0x5f, 0xef, 0x72, 0x02, + 0xde, 0xc2, 0xbe, 0x50, 0x9f, 0xe5, 0xe1, 0x03, 0xa7, 0x35, 0xa1, 0x84, 0x9d, 0xf4, 0xef, 0xb6, + 0x43, 0xf2, 0x63, 0x3b, 0x24, 0xbf, 0xb6, 0x43, 0xf2, 0xd1, 0xb3, 0xe8, 0x7c, 0x3e, 0xf7, 0xf0, + 0x13, 0x7b, 0xf1, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x1e, 0x9b, 0x28, 0x1b, 0x93, 0x03, 0x00, 0x00, } func (m *Sample) Marshal() (dAtA []byte, err error) { @@ -665,6 +832,93 @@ func (m *ReadHints) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *Chunk) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Chunk) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.MinTimeMs != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintTypes(dAtA, i, uint64(m.MinTimeMs)) + } + if m.MaxTimeMs != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintTypes(dAtA, i, uint64(m.MaxTimeMs)) + } + if m.Type != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintTypes(dAtA, i, uint64(m.Type)) + } + if len(m.Data) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintTypes(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *ChunkedSeries) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ChunkedSeries) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0xa + i++ + i = encodeVarintTypes(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Chunks) > 0 { + for _, msg := range m.Chunks { + dAtA[i] = 0x12 + i++ + i = encodeVarintTypes(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -802,6 +1056,55 @@ func (m *ReadHints) Size() (n int) { return n } +func (m *Chunk) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.MinTimeMs != 0 { + n += 1 + sovTypes(uint64(m.MinTimeMs)) + } + if m.MaxTimeMs != 0 { + n += 1 + sovTypes(uint64(m.MaxTimeMs)) + } + if m.Type != 0 { + n += 1 + sovTypes(uint64(m.Type)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *ChunkedSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovTypes(x uint64) (n int) { for { n++ @@ -1507,6 +1810,273 @@ func (m *ReadHints) Unmarshal(dAtA []byte) error { } return nil } +func (m *Chunk) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Chunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Chunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MinTimeMs", wireType) + } + m.MinTimeMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MinTimeMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxTimeMs", wireType) + } + m.MaxTimeMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxTimeMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= Chunk_Encoding(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ChunkedSeries) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ChunkedSeries: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ChunkedSeries: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, Label{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, Chunk{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipTypes(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/prompb/types.proto b/prompb/types.proto index 6fc84c4058..d47d3521c2 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -23,6 +23,7 @@ message Sample { int64 timestamp = 2; } +// TimeSeries represents samples and labels for a single time series. message TimeSeries { repeated Label labels = 1 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false]; @@ -56,3 +57,25 @@ message ReadHints { int64 start_ms = 3; // Start time in milliseconds. int64 end_ms = 4; // End time in milliseconds. } + +// Chunk represents a TSDB chunk. +message Chunk { + int64 min_time_ms = 1; + int64 max_time_ms = 2; + + // We require this to match chunkenc.Encoding. + enum Encoding { + UNKNOWN = 0; + XOR = 1; + } + Encoding type = 3; + bytes data = 4; +} + +// ChunkedSeries represents single, encoded time series. +message ChunkedSeries { + // labels should be sorted. + repeated Label labels = 1 [(gogoproto.nullable) = false]; + // chunks should be sorted should not overlap in time. + repeated Chunk chunks = 2 [(gogoproto.nullable) = false]; +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index d3aaaa98bf..d28bedaa95 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -856,6 +856,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } + // Empty req.AcceptedResponseTypes means non streamed, raw samples response. + if len(req.AcceptedResponseTypes) > 0 { + http.Error(w, fmt.Sprintf("none of requested response types are implemented: %v", req.AcceptedResponseTypes), http.StatusNotImplemented) + return + } + resp := prompb.ReadResponse{ Results: make([]*prompb.QueryResult, len(req.Queries)), }