Addressed comments.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2019-07-20 20:20:39 +01:00
parent 0bab6be6e1
commit 030ccfad61
8 changed files with 164 additions and 176 deletions

View file

@ -213,13 +213,13 @@ func main() {
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline) Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for STREAMED_XOR_CHUNKS response type."). a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types.").
Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit) Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit)
a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit.").
Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit)
a.Flag("storage.remote.read-max-chunks-in-frame", "Maximum number of chunks in single frame for STREAMED_XOR_CHUNKS remote read response type. Each chunk corresponds roughly to (~3B * 120 samples) + 32B. Default is 1000 which is roughly (1000 * ~400B) + labelset, so approx. 0.4MB per frame ."). a.Flag("storage.remote.read-max-chunks-in-frame", "Maximum number of chunks in single frame for STREAMED_XOR_CHUNKS remote read response type. Each chunk corresponds roughly to (~3B * 120 samples) + 32B. Default is 1000 which is roughly (1000 * ~400B) + labelset, so approx. 0.4MB per frame. Be aware that client might have limit on frame size as well.").
Default("1000").IntVar(&cfg.web.RemoteReadMaxChunksInFrame) Default("1000").IntVar(&cfg.web.RemoteReadMaxChunksInFrame)
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert.").

View file

@ -430,6 +430,7 @@ func (m *ReadHints) GetEndMs() int64 {
} }
// Chunk represents a TSDB chunk. // Chunk represents a TSDB chunk.
// Time range [min, max] is inclusive.
type Chunk struct { type Chunk struct {
MinTimeMs int64 `protobuf:"varint,1,opt,name=min_time_ms,json=minTimeMs,proto3" json:"min_time_ms,omitempty"` MinTimeMs int64 `protobuf:"varint,1,opt,name=min_time_ms,json=minTimeMs,proto3" json:"min_time_ms,omitempty"`
MaxTimeMs int64 `protobuf:"varint,2,opt,name=max_time_ms,json=maxTimeMs,proto3" json:"max_time_ms,omitempty"` MaxTimeMs int64 `protobuf:"varint,2,opt,name=max_time_ms,json=maxTimeMs,proto3" json:"max_time_ms,omitempty"`
@ -503,9 +504,9 @@ func (m *Chunk) GetData() []byte {
// ChunkedSeries represents single, encoded time series. // ChunkedSeries represents single, encoded time series.
type ChunkedSeries struct { type ChunkedSeries struct {
// labels should be sorted. // Labels should be sorted.
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
// chunks should be sorted should not overlap in time. // Chunks will be in start time order and may overlap.
Chunks []Chunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` Chunks []Chunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`

View file

@ -59,6 +59,7 @@ message ReadHints {
} }
// Chunk represents a TSDB chunk. // Chunk represents a TSDB chunk.
// Time range [min, max] is inclusive.
message Chunk { message Chunk {
int64 min_time_ms = 1; int64 min_time_ms = 1;
int64 max_time_ms = 2; int64 max_time_ms = 2;
@ -74,8 +75,8 @@ message Chunk {
// ChunkedSeries represents single, encoded time series. // ChunkedSeries represents single, encoded time series.
message ChunkedSeries { message ChunkedSeries {
// labels should be sorted. // Labels should be sorted.
repeated Label labels = 1 [(gogoproto.nullable) = false]; repeated Label labels = 1 [(gogoproto.nullable) = false];
// chunks should be sorted should not overlap in time. // Chunks will be in start time order and may overlap.
repeated Chunk chunks = 2 [(gogoproto.nullable) = false]; repeated Chunk chunks = 2 [(gogoproto.nullable) = false];
} }

View file

@ -15,11 +15,17 @@ package remote
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"github.com/gogo/protobuf/proto"
"io" "io"
"net/http" "net/http"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
) )
// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.
// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
const DefaultChunkedReadLimit = 5e+7
// ChunkedWriter is an io.Writer wrapper that allows streaming by adding uvarint delimiter before each write in a form // ChunkedWriter is an io.Writer wrapper that allows streaming by adding uvarint delimiter before each write in a form
// of length of the corresponded byte array. // of length of the corresponded byte array.
type ChunkedWriter struct { type ChunkedWriter struct {
@ -61,11 +67,12 @@ func (w *ChunkedWriter) Write(b []byte) (int, error) {
type ChunkedReader struct { type ChunkedReader struct {
b *bufio.Reader b *bufio.Reader
data []byte data []byte
sizeLimit uint64
} }
// NewChunkedReader constructs a ChunkedReader. // NewChunkedReader constructs a ChunkedReader.
func NewChunkedReader(r io.Reader) *ChunkedReader { func NewChunkedReader(r io.Reader, sizeLimit uint64) *ChunkedReader {
return &ChunkedReader{b: bufio.NewReader(r)} return &ChunkedReader{b: bufio.NewReader(r), sizeLimit: sizeLimit}
} }
// Next returns the next length-delimited record from the input, or io.EOF if // Next returns the next length-delimited record from the input, or io.EOF if
@ -80,6 +87,10 @@ func (r *ChunkedReader) Next() ([]byte, error) {
return nil, err return nil, err
} }
if size > r.sizeLimit {
return nil, errors.Errorf("chunkedReader: message size exceeded the limit %v bytes; got: %v bytes", r.sizeLimit, size)
}
if cap(r.data) < int(size) { if cap(r.data) < int(size) {
r.data = make([]byte, size) r.data = make([]byte, size)
} else { } else {

View file

@ -14,9 +14,10 @@ package remote
import ( import (
"bytes" "bytes"
"github.com/prometheus/prometheus/util/testutil"
"io" "io"
"testing" "testing"
"github.com/prometheus/prometheus/util/testutil"
) )
type mockedFlusher struct { type mockedFlusher struct {
@ -27,11 +28,11 @@ func (f *mockedFlusher) Flush() {
f.flushed++ f.flushed++
} }
func TestStreamReaderCanReadWriter(t *testing.T) { func TestChunkedReaderCanReadFromChunkedWriter(t *testing.T) {
b := &bytes.Buffer{} b := &bytes.Buffer{}
f := &mockedFlusher{} f := &mockedFlusher{}
w := NewChunkedWriter(b, f) w := NewChunkedWriter(b, f)
r := NewChunkedReader(b) r := NewChunkedReader(b, 20)
msgs := [][]byte{ msgs := [][]byte{
[]byte("test1"), []byte("test1"),
@ -70,3 +71,20 @@ func TestStreamReaderCanReadWriter(t *testing.T) {
testutil.Equals(t, 5, f.flushed) testutil.Equals(t, 5, f.flushed)
} }
func TestChunkedReader_Overflow(t *testing.T) {
b := &bytes.Buffer{}
_, err := NewChunkedWriter(b, &mockedFlusher{}).Write([]byte("twelve bytes"))
testutil.Ok(t, err)
b2 := make([]byte, 12)
copy(b2, b.Bytes())
ret, err := NewChunkedReader(b, 12).Next()
testutil.Ok(t, err)
testutil.Equals(t, "twelve bytes", string(ret))
_, err = NewChunkedReader(bytes.NewReader(b2), 11).Next()
testutil.NotOk(t, err, "expect exceed limit error")
testutil.Equals(t, "chunkedReader: message size exceeded the limit 11 bytes; got: 12 bytes", err.Error())
}

View file

@ -73,9 +73,6 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
return err return err
} }
w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")
compressed := snappy.Encode(nil, data) compressed := snappy.Encode(nil, data)
_, err = w.Write(compressed) _, err = w.Write(compressed)
return err return err
@ -164,6 +161,20 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
} }
} }
// NegotiateResponseType returns first accepted response type that this server supports.
func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) prompb.ReadRequest_ResponseType {
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
}
for _, resType := range accepted {
if _, ok := supported[resType]; ok {
return resType
}
}
return -1
}
// StreamChunkedReadResponses iterates over series, build chunks and streams those to caller. // StreamChunkedReadResponses iterates over series, build chunks and streams those to caller.
// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of rencoding everything. // TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of rencoding everything.
func StreamChunkedReadResponses( func StreamChunkedReadResponses(
@ -183,9 +194,9 @@ func StreamChunkedReadResponses(
iter := series.Iterator() iter := series.Iterator()
lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels) lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels)
// TODO(bwplotka): We send each series in separate frame no matter what. Even if series has only one sample. // Send at most one series per frame; series may be split over multiple frames according to maxChunksInFrame"
// I think we should pack strictly based on number chunks not necessarily from the same series. Thoughts?
for { for {
// TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/tsdb/pull/665
chks, err = encodeChunks(iter, chks, maxChunksInFrame) chks, err = encodeChunks(iter, chks, maxChunksInFrame)
if err != nil { if err != nil {
return err return err
@ -196,7 +207,6 @@ func StreamChunkedReadResponses(
} }
b, err := proto.Marshal(&prompb.ChunkedReadResponse{ b, err := proto.Marshal(&prompb.ChunkedReadResponse{
// TODO(bwplotka): Do we really need multiple?
ChunkedSeries: []*prompb.ChunkedSeries{ ChunkedSeries: []*prompb.ChunkedSeries{
{ {
Labels: lbls, Labels: lbls,

View file

@ -843,20 +843,6 @@ func (api *API) serveFlags(r *http.Request) apiFuncResult {
return apiFuncResult{api.flagsMap, nil, nil, nil} return apiFuncResult{api.flagsMap, nil, nil, nil}
} }
// negotiateResponseType returns first accepted response type that this server supports.
func negotiateResponseType(accepted []prompb.ReadRequest_ResponseType) prompb.ReadRequest_ResponseType {
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
}
for _, resType := range accepted {
if _, ok := supported[resType]; ok {
return resType
}
}
return -1
}
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
if err := api.remoteReadGate.Start(ctx); err != nil { if err := api.remoteReadGate.Start(ctx); err != nil {
@ -874,12 +860,86 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
return return
} }
switch negotiateResponseType(req.AcceptedResponseTypes) { externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
// Add external labels back in, in sorted order.
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
Name: string(name),
Value: string(value),
})
}
sort.Slice(sortedExternalLabels, func(i, j int) bool {
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
})
switch remote.NegotiateResponseType(req.AcceptedResponseTypes) {
case prompb.ReadRequest_STREAMED_XOR_CHUNKS: case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
api.streamedChunkedRemoteRead(ctx, w, req) w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
// TODO(bwplotka): Should we use snappy? benchmark to see.
// w.Header().Set("Content-Encoding", "snappy")
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
return
}
for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error {
return remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
int64(i),
set,
sortedExternalLabels,
api.remoteReadMaxChunksInFrame,
)
})
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
default: default:
w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response. // On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
api.sampledRemoteRead(ctx, w, req) resp := prompb.ReadResponse{
Results: make([]*prompb.QueryResult, len(req.Queries)),
}
for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error {
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
if err != nil {
return err
}
for _, ts := range resp.Results[i].Timeseries {
ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
}
return nil
})
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if err := remote.EncodeReadResponse(&resp, w); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} }
} }
@ -909,28 +969,16 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
return filteredMatchers, nil return filteredMatchers, nil
} }
func (api *API) sampledRemoteRead(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest) { func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(set storage.SeriesSet) error) error {
resp := prompb.ReadResponse{
Results: make([]*prompb.QueryResult, len(req.Queries)),
}
externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
for i, query := range req.Queries {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) return err
return
} }
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) return err
return
} }
defer func() {
if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
}
}()
var selectParams *storage.SelectParams var selectParams *storage.SelectParams
if query.Hints != nil { if query.Hints != nil {
@ -942,118 +990,17 @@ func (api *API) sampledRemoteRead(ctx context.Context, w http.ResponseWriter, re
} }
} }
set, _, err := querier.Select(selectParams, filteredMatchers...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Add external labels back in, in sorted order.
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
Name: string(name),
Value: string(value),
})
}
sort.Slice(sortedExternalLabels, func(i, j int) bool {
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
})
for _, ts := range resp.Results[i].Timeseries {
ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
}
}
if err := remote.EncodeReadResponse(&resp, w); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (api *API) streamedChunkedRemoteRead(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest) {
externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
for i, query := range req.Queries {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() { defer func() {
if err := querier.Close(); err != nil { if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
} }
}() }()
var selectParams *storage.SelectParams
if query.Hints != nil {
selectParams = &storage.SelectParams{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
}
}
// TODO(bwplotka): Change interface / find a way to select chunks.
set, _, err := querier.Select(selectParams, filteredMatchers...) set, _, err := querier.Select(selectParams, filteredMatchers...)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) return err
return
}
// Add external labels back in, in sorted order.
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
Name: string(name),
Value: string(value),
})
}
sort.Slice(sortedExternalLabels, func(i, j int) bool {
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
})
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
// TODO(bwplotka): Should we use snappy? benchmark to see.
// w.Header().Set("Content-Encoding", "snappy")
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
return
}
if err := remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
int64(i),
set,
sortedExternalLabels,
api.remoteReadMaxChunksInFrame,
); err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} }
return seriesHandleFn(set)
} }
func (api *API) deleteSeries(r *http.Request) apiFuncResult { func (api *API) deleteSeries(r *http.Request) apiFuncResult {

View file

@ -1084,7 +1084,7 @@ func TestStreamReadEndpoint(t *testing.T) {
testutil.Equals(t, "", recorder.Result().Header.Get("Content-Encoding")) testutil.Equals(t, "", recorder.Result().Header.Get("Content-Encoding"))
var results []*prompb.ChunkedReadResponse var results []*prompb.ChunkedReadResponse
stream := remote.NewChunkedReader(recorder.Result().Body) stream := remote.NewChunkedReader(recorder.Result().Body, remote.DefaultChunkedReadLimit)
for { for {
res := &prompb.ChunkedReadResponse{} res := &prompb.ChunkedReadResponse{}
err := stream.NextProto(res) err := stream.NextProto(res)