Addressed Brian's comments.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2019-07-30 13:45:22 +01:00
parent 8f1b9cafc6
commit 12e0a2ede3
6 changed files with 52 additions and 54 deletions

View file

@ -32,8 +32,9 @@ const (
// Response headers:
// Content-Type: "application/x-protobuf"
// Content-Encoding: "snappy"
ReadRequest_SAMPLED ReadRequest_ResponseType = 0
// Server will stream a varint delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
ReadRequest_SAMPLES ReadRequest_ResponseType = 0
// Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
// Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum.
//
// Response headers:
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
@ -42,12 +43,12 @@ const (
)
var ReadRequest_ResponseType_name = map[int32]string{
0: "SAMPLED",
0: "SAMPLES",
1: "STREAMED_XOR_CHUNKS",
}
var ReadRequest_ResponseType_value = map[string]int32{
"SAMPLED": 0,
"SAMPLES": 0,
"STREAMED_XOR_CHUNKS": 1,
}
@ -113,7 +114,7 @@ type ReadRequest struct {
//
// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
// implemented by server, error is returned.
// For request that do not support `accepted_response_types` field the SAMPLED response type will be used.
// For request that do not support `accepted_response_types` field the SAMPLES response type will be used.
AcceptedResponseTypes []ReadRequest_ResponseType `protobuf:"varint,2,rep,packed,name=accepted_response_types,json=acceptedResponseTypes,proto3,enum=prometheus.ReadRequest_ResponseType" json:"accepted_response_types,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -411,33 +412,33 @@ var fileDescriptor_eefc82927d57d89b = []byte{
// 466 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0xbb, 0x4d, 0xdb, 0xa0, 0x71, 0x88, 0xc2, 0xb6, 0x25, 0xa6, 0x87, 0x34, 0xb2, 0x38,
0x58, 0x2a, 0x0a, 0x22, 0x54, 0x9c, 0x38, 0x90, 0xb6, 0x91, 0x8a, 0x48, 0xf8, 0xb3, 0x09, 0x02,
0x21, 0x24, 0xcb, 0xb1, 0x47, 0x8d, 0x45, 0x6d, 0x6f, 0x77, 0xd7, 0x52, 0xf3, 0x7a, 0x9c, 0x7a,
0x42, 0x3c, 0x01, 0x42, 0x79, 0x12, 0xb4, 0xeb, 0xb8, 0x6c, 0xe0, 0xd2, 0xdb, 0xfa, 0xfb, 0xbe,
0xf9, 0x79, 0x67, 0x3c, 0x86, 0x86, 0xc0, 0x34, 0x57, 0xd8, 0xe3, 0x22, 0x57, 0x39, 0x05, 0x2e,
0xf2, 0x14, 0xd5, 0x1c, 0x0b, 0x79, 0xe0, 0xa8, 0x05, 0x47, 0x59, 0x1a, 0x07, 0x7b, 0x17, 0xf9,
0x45, 0x6e, 0x8e, 0x4f, 0xf5, 0xa9, 0x54, 0xbd, 0x11, 0x34, 0x3e, 0x89, 0x44, 0x21, 0xc3, 0xab,
0x02, 0xa5, 0xa2, 0x2f, 0x01, 0x54, 0x92, 0xa2, 0x44, 0x91, 0xa0, 0x74, 0x49, 0xb7, 0xe6, 0x3b,
0xfd, 0x87, 0xbd, 0xbf, 0xcc, 0xde, 0x34, 0x49, 0x71, 0x62, 0xdc, 0x93, 0xad, 0x9b, 0x5f, 0x87,
0x1b, 0xcc, 0xca, 0x7b, 0x3f, 0x08, 0x38, 0x0c, 0xc3, 0xb8, 0xa2, 0x1d, 0x41, 0xfd, 0xaa, 0xb0,
0x51, 0x0f, 0x6c, 0xd4, 0x87, 0x02, 0xc5, 0x82, 0x55, 0x09, 0xfa, 0x15, 0xda, 0x61, 0x14, 0x21,
0x57, 0x18, 0x07, 0x02, 0x25, 0xcf, 0x33, 0x89, 0x81, 0xe9, 0xc0, 0xdd, 0xec, 0xd6, 0xfc, 0x66,
0xff, 0xb1, 0x5d, 0x6c, 0xbd, 0xa6, 0xc7, 0x56, 0xe9, 0xe9, 0x82, 0x23, 0xdb, 0xaf, 0x20, 0xb6,
0x2a, 0xbd, 0x63, 0x68, 0xd8, 0x02, 0x75, 0xa0, 0x3e, 0x19, 0x8c, 0xdf, 0x8f, 0x86, 0x67, 0xad,
0x0d, 0xda, 0x86, 0xdd, 0xc9, 0x94, 0x0d, 0x07, 0xe3, 0xe1, 0x59, 0xf0, 0xf9, 0x1d, 0x0b, 0x4e,
0xcf, 0x3f, 0xbe, 0x7d, 0x33, 0x69, 0x11, 0x6f, 0xa0, 0xab, 0xc2, 0x5b, 0x14, 0x7d, 0x06, 0x75,
0x81, 0xb2, 0xb8, 0x54, 0x55, 0x43, 0xed, 0xff, 0x1b, 0x32, 0x3e, 0xab, 0x72, 0xde, 0x77, 0x02,
0xdb, 0xc6, 0xa0, 0x4f, 0x80, 0x4a, 0x15, 0x0a, 0x15, 0x98, 0x89, 0xa9, 0x30, 0xe5, 0x41, 0xaa,
0x39, 0xc4, 0xaf, 0xb1, 0x96, 0x71, 0xa6, 0x95, 0x31, 0x96, 0xd4, 0x87, 0x16, 0x66, 0xf1, 0x7a,
0x76, 0xd3, 0x64, 0x9b, 0x98, 0xc5, 0x76, 0xf2, 0x18, 0xee, 0xa5, 0xa1, 0x8a, 0xe6, 0x28, 0xa4,
0x5b, 0x33, 0xb7, 0x72, 0xed, 0x5b, 0x8d, 0xc2, 0x19, 0x5e, 0x8e, 0xcb, 0x00, 0xbb, 0x4d, 0xd2,
0x23, 0xd8, 0x9e, 0x27, 0x99, 0x92, 0xee, 0x56, 0x97, 0xf8, 0x4e, 0x7f, 0xff, 0xdf, 0xe1, 0x9e,
0x6b, 0x93, 0x95, 0x19, 0x6f, 0x08, 0x8e, 0xd5, 0x1c, 0x7d, 0x71, 0xf7, 0x2d, 0x59, 0xdb, 0x8f,
0x6b, 0xd8, 0x3d, 0x9d, 0x17, 0xd9, 0x37, 0xfd, 0x71, 0xac, 0xa9, 0xbe, 0x82, 0x66, 0x54, 0xca,
0xc1, 0x1a, 0xf2, 0x91, 0x8d, 0x5c, 0x15, 0xae, 0xa8, 0xf7, 0x23, 0xfb, 0x91, 0x1e, 0x82, 0xa3,
0xd7, 0x68, 0x11, 0x24, 0x59, 0x8c, 0xd7, 0xab, 0x39, 0x81, 0x91, 0x5e, 0x6b, 0xe5, 0x64, 0xef,
0x66, 0xd9, 0x21, 0x3f, 0x97, 0x1d, 0xf2, 0x7b, 0xd9, 0x21, 0x5f, 0x76, 0x34, 0x97, 0xcf, 0x66,
0x3b, 0xe6, 0x27, 0x78, 0xfe, 0x27, 0x00, 0x00, 0xff, 0xff, 0x35, 0x74, 0x2e, 0x47, 0x43, 0x03,
0x58, 0x2a, 0x0a, 0x22, 0x54, 0x9c, 0x38, 0x90, 0x96, 0x48, 0x45, 0x24, 0xfc, 0x59, 0x07, 0x81,
0x10, 0x92, 0xe5, 0xd8, 0xa3, 0xc6, 0xa2, 0xfe, 0xd3, 0xdd, 0xb5, 0xd4, 0xbc, 0x1e, 0xa7, 0x9e,
0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0xed, 0xda, 0x0e, 0x1b, 0xb8, 0x70, 0x5b, 0x7f, 0xdf, 0x37,
0x3f, 0xef, 0x8c, 0xc7, 0xd0, 0xe2, 0x98, 0x64, 0x12, 0x07, 0x39, 0xcf, 0x64, 0x46, 0x21, 0xe7,
0x59, 0x82, 0x72, 0x81, 0x85, 0x38, 0xb2, 0xe4, 0x32, 0x47, 0x51, 0x1a, 0x47, 0x07, 0x97, 0xd9,
0x65, 0xa6, 0x8f, 0x8f, 0xd5, 0xa9, 0x54, 0x9d, 0x09, 0xb4, 0x3e, 0xf2, 0x58, 0x22, 0xc3, 0xeb,
0x02, 0x85, 0xa4, 0xcf, 0x01, 0x64, 0x9c, 0xa0, 0x40, 0x1e, 0xa3, 0xb0, 0x49, 0xbf, 0xe1, 0x5a,
0xc3, 0xfb, 0x83, 0x3f, 0xcc, 0xc1, 0x2c, 0x4e, 0xd0, 0xd3, 0xee, 0xd9, 0xce, 0xed, 0xcf, 0xe3,
0x2d, 0x66, 0xe4, 0x9d, 0xef, 0x04, 0x2c, 0x86, 0x41, 0x54, 0xd3, 0x4e, 0xa0, 0x79, 0x5d, 0x98,
0xa8, 0x7b, 0x26, 0xea, 0x7d, 0x81, 0x7c, 0xc9, 0xea, 0x04, 0xfd, 0x02, 0xdd, 0x20, 0x0c, 0x31,
0x97, 0x18, 0xf9, 0x1c, 0x45, 0x9e, 0xa5, 0x02, 0x7d, 0xdd, 0x81, 0xbd, 0xdd, 0x6f, 0xb8, 0xed,
0xe1, 0x43, 0xb3, 0xd8, 0x78, 0xcd, 0x80, 0x55, 0xe9, 0xd9, 0x32, 0x47, 0x76, 0x58, 0x43, 0x4c,
0x55, 0x38, 0xa7, 0xd0, 0x32, 0x05, 0x6a, 0x41, 0xd3, 0x1b, 0x4d, 0xdf, 0x4d, 0xc6, 0x5e, 0x67,
0x8b, 0x76, 0x61, 0xdf, 0x9b, 0xb1, 0xf1, 0x68, 0x3a, 0x7e, 0xe9, 0x7f, 0x7a, 0xcb, 0xfc, 0xf3,
0x8b, 0x0f, 0x6f, 0x5e, 0x7b, 0x1d, 0xe2, 0x8c, 0x54, 0x55, 0xb0, 0x46, 0xd1, 0x27, 0xd0, 0xe4,
0x28, 0x8a, 0x2b, 0x59, 0x37, 0xd4, 0xfd, 0xb7, 0x21, 0xed, 0xb3, 0x3a, 0xe7, 0x7c, 0x23, 0xb0,
0xab, 0x0d, 0xfa, 0x08, 0xa8, 0x90, 0x01, 0x97, 0xbe, 0x9e, 0x98, 0x0c, 0x92, 0xdc, 0x4f, 0x14,
0x87, 0xb8, 0x0d, 0xd6, 0xd1, 0xce, 0xac, 0x36, 0xa6, 0x82, 0xba, 0xd0, 0xc1, 0x34, 0xda, 0xcc,
0x6e, 0xeb, 0x6c, 0x1b, 0xd3, 0xc8, 0x4c, 0x9e, 0xc2, 0x9d, 0x24, 0x90, 0xe1, 0x02, 0xb9, 0xb0,
0x1b, 0xfa, 0x56, 0xb6, 0x79, 0xab, 0x49, 0x30, 0xc7, 0xab, 0x69, 0x19, 0x60, 0xeb, 0x24, 0x3d,
0x81, 0xdd, 0x45, 0x9c, 0x4a, 0x61, 0xef, 0xf4, 0x89, 0x6b, 0x0d, 0x0f, 0xff, 0x1e, 0xee, 0x85,
0x32, 0x59, 0x99, 0x71, 0xc6, 0x60, 0x19, 0xcd, 0xd1, 0x67, 0xff, 0xbf, 0x25, 0x1b, 0xfb, 0x71,
0x03, 0xfb, 0xe7, 0x8b, 0x22, 0xfd, 0xaa, 0x3e, 0x8e, 0x31, 0xd5, 0x17, 0xd0, 0x0e, 0x4b, 0xd9,
0xdf, 0x40, 0x3e, 0x30, 0x91, 0x55, 0x61, 0x45, 0xbd, 0x1b, 0x9a, 0x8f, 0xf4, 0x18, 0x2c, 0xb5,
0x46, 0x4b, 0x3f, 0x4e, 0x23, 0xbc, 0xa9, 0xe6, 0x04, 0x5a, 0x7a, 0xa5, 0x94, 0xb3, 0x83, 0xdb,
0x55, 0x8f, 0xfc, 0x58, 0xf5, 0xc8, 0xaf, 0x55, 0x8f, 0x7c, 0xde, 0x53, 0xdc, 0x7c, 0x3e, 0xdf,
0xd3, 0x3f, 0xc1, 0xd3, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xb6, 0x6b, 0xcd, 0x43, 0x03,
0x00, 0x00,
}

View file

@ -34,8 +34,9 @@ message ReadRequest {
// Response headers:
// Content-Type: "application/x-protobuf"
// Content-Encoding: "snappy"
SAMPLED = 0;
// Server will stream a varint delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
SAMPLES = 0;
// Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
// Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum.
//
// Response headers:
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
@ -47,7 +48,7 @@ message ReadRequest {
//
// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
// implemented by server, error is returned.
// For request that do not support `accepted_response_types` field the SAMPLED response type will be used.
// For request that do not support `accepted_response_types` field the SAMPLES response type will be used.
repeated ResponseType accepted_response_types = 2;
}

View file

@ -55,7 +55,7 @@ func NewChunkedWriter(w io.Writer, f http.Flusher) *ChunkedWriter {
// Each frame includes:
//
// 1. uvarint for the size of the data frame.
// 2. uvarint for the Castagnoli polynomial CRC-32 checksum of the data frame.
// 2. big-endian uint32 for the Castagnoli polynomial CRC-32 checksum of the data frame.
// 3. n bytes where n is given in the first uvarint.
//
// Write returns number of sent bytes for a given buffer. The number does not include delimiter and checksum bytes.
@ -75,8 +75,7 @@ func (w *ChunkedWriter) Write(b []byte) (int, error) {
return 0, err
}
v = binary.PutUvarint(buf[:], uint64(w.crc32.Sum32()))
if _, err := w.writer.Write(buf[:v]); err != nil {
if err := binary.Write(w.writer, binary.BigEndian, w.crc32.Sum32()); err != nil {
return 0, err
}
@ -107,7 +106,7 @@ func NewChunkedReader(r io.Reader, sizeLimit uint64) *ChunkedReader {
// Next returns the next length-delimited record from the input, or io.EOF if
// there are no more records available. Returns io.ErrUnexpectedEOF if a short
// record is found, with a length of n but fewer than n bytes of data.
// Next also verifies the CRC32 checksum.
// Next also verifies the given checksum with Castagnoli polynomial CRC-32 checksum.
//
// NOTE: The slice returned is valid only until a subsequent call to Next. It's a caller's responsibility to copy the
// returned slice if needed.
@ -127,8 +126,8 @@ func (r *ChunkedReader) Next() ([]byte, error) {
r.data = r.data[:size]
}
crc32, err := binary.ReadUvarint(r.b)
if err != nil {
var crc32 uint32
if err := binary.Read(r.b, binary.BigEndian, &crc32); err != nil {
return nil, err
}
@ -137,7 +136,7 @@ func (r *ChunkedReader) Next() ([]byte, error) {
return nil, err
}
if uint64(r.crc32.Sum32()) != crc32 {
if r.crc32.Sum32() != crc32 {
return nil, errors.New("chunkedReader: corrupted frame; checksum mismatch")
}
return r.data, nil

View file

@ -162,14 +162,14 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
}
// NegotiateResponseType returns first accepted response type that this server supports.
// On the empty accepted list we assume that the SAMPLED response type was requested. This is to maintain backward compatibility.
// On the empty accepted list we assume that the SAMPLES response type was requested. This is to maintain backward compatibility.
func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error) {
if len(accepted) == 0 {
accepted = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLED}
accepted = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}
}
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_SAMPLED: {},
prompb.ReadRequest_SAMPLES: {},
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
}
@ -243,7 +243,7 @@ func StreamChunkedReadResponses(
return nil
}
// encodeChunks expects iterator to be ready to use (aka iter.Next() done before invoking).
// encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking).
func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks int) ([]prompb.Chunk, error) {
const maxSamplesInChunk = 120

View file

@ -216,23 +216,23 @@ func TestFromQueryResultWithDuplicates(t *testing.T) {
func TestNegotiateResponseType(t *testing.T) {
r, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
prompb.ReadRequest_SAMPLED,
prompb.ReadRequest_SAMPLES,
})
testutil.Ok(t, err)
testutil.Equals(t, prompb.ReadRequest_STREAMED_XOR_CHUNKS, r)
r2, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{
prompb.ReadRequest_SAMPLED,
prompb.ReadRequest_SAMPLES,
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
})
testutil.Ok(t, err)
testutil.Equals(t, prompb.ReadRequest_SAMPLED, r2)
testutil.Equals(t, prompb.ReadRequest_SAMPLES, r2)
r3, err := NegotiateResponseType([]prompb.ReadRequest_ResponseType{})
testutil.Ok(t, err)
testutil.Equals(t, prompb.ReadRequest_SAMPLED, r3)
testutil.Equals(t, prompb.ReadRequest_SAMPLES, r3)
_, err = NegotiateResponseType([]prompb.ReadRequest_ResponseType{20})
testutil.NotOk(t, err, "expected error due to not supported requested response types")
testutil.Equals(t, "server does not support any of the requested response types: [20]; supported: map[SAMPLED:{} STREAMED_XOR_CHUNKS:{}]", err.Error())
testutil.Equals(t, "server does not support any of the requested response types: [20]; supported: map[SAMPLES:{} STREAMED_XOR_CHUNKS:{}]", err.Error())
}

View file

@ -862,7 +862,6 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
// Add external labels back in, in sorted order.
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
@ -883,8 +882,6 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
switch responseType {
case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
// TODO(bwplotka): Should we use snappy? benchmark to see.
// w.Header().Set("Content-Encoding", "snappy")
f, ok := w.(http.Flusher)
if !ok {