remote-read: added server side chunked streaming support.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2019-06-20 18:58:32 +01:00
parent a91ee49705
commit a5d0e81a9d
5 changed files with 501 additions and 133 deletions

View file

@ -213,12 +213,15 @@ func main() {
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline) Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit."). a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for STREAMED_XOR_CHUNKS response type.").
Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit) Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit)
a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit.").
Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit)
a.Flag("storage.remote.read-max-chunks-in-frame", "Maximum number of chunks in single frame for STREAMED_XOR_CHUNKS remote read response type. Each chunk corresponds roughly to (~3B * 120 samples) + 32B. Default is 1000 which is roughly (1000 * ~400B) + labelset, so approx. 0.4MB per frame .").
Default("1000").IntVar(&cfg.web.RemoteReadMaxChunksInFrame)
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert.").
Default("1h").SetValue(&cfg.outageTolerance) Default("1h").SetValue(&cfg.outageTolerance)

View file

@ -24,10 +24,10 @@ import (
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/tsdb/chunkenc"
) )
// decodeReadLimit is the maximum size of a read request body in bytes. // decodeReadLimit is the maximum size of a read request body in bytes.
@ -106,25 +106,6 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams
}, nil }, nil
} }
// FromQuery unpacks a Query proto.
func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, *storage.SelectParams, error) {
matchers, err := fromLabelMatchers(req.Matchers)
if err != nil {
return 0, 0, nil, nil, err
}
var selectParams *storage.SelectParams
if req.Hints != nil {
selectParams = &storage.SelectParams{
Start: req.Hints.StartMs,
End: req.Hints.EndMs,
Step: req.Hints.StepMs,
Func: req.Hints.Func,
}
}
return req.StartTimestampMs, req.EndTimestampMs, matchers, selectParams, nil
}
// ToQueryResult builds a QueryResult proto. // ToQueryResult builds a QueryResult proto.
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) { func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) {
numSamples := 0 numSamples := 0
@ -183,6 +164,156 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
} }
} }
// StreamChunkedReadResponses iteraties over series, build chunks and streams those to caller.
// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of rencoding everything.
func StreamChunkedReadResponses(
stream io.Writer,
queryIndex int64,
ss storage.SeriesSet,
sortedExternalLabels []prompb.Label,
maxChunksInFrame int,
) error {
var (
chks = make([]prompb.Chunk, 0, maxChunksInFrame)
err error
)
for ss.Next() {
series := ss.At()
iter := series.Iterator()
lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels)
for {
chks, err = encodeChunks(iter, chks, maxChunksInFrame)
if err != nil {
return err
}
if len(chks) == 0 {
break
}
b, err := proto.Marshal(&prompb.ChunkedReadResponse{
// TODO(bwplotka): Do we really need multiple?
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: lbls,
Chunks: chks,
},
},
QueryIndex: queryIndex,
})
if err != nil {
return errors.Wrap(err, "marshal ChunkedReadResponse")
}
if _, err := stream.Write(b); err != nil {
return errors.Wrap(err, "write to stream")
}
chks = chks[:0]
}
if err := iter.Err(); err != nil {
return err
}
}
if err := ss.Err(); err != nil {
return err
}
return nil
}
// encodeChunks expects iterator to be ready to use (aka iter.Next() done before invoking).
func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks int) ([]prompb.Chunk, error) {
const maxSamplesInChunk = 120
var (
chkMint int64
chkMaxt int64
chk *chunkenc.XORChunk
app chunkenc.Appender
err error
numSamples = 0
)
for iter.Next() {
numSamples++
if chk == nil {
chk = chunkenc.NewXORChunk()
app, err = chk.Appender()
if err != nil {
return nil, err
}
chkMint, _ = iter.At()
}
app.Append(iter.At())
chkMaxt, _ = iter.At()
if chk.NumSamples() < maxSamplesInChunk {
continue
}
// Cut the chunk.
chks = append(chks, prompb.Chunk{
MinTimeMs: chkMint,
MaxTimeMs: chkMaxt,
Type: prompb.Chunk_Encoding(chk.Encoding()),
Data: chk.Bytes(),
})
chk = nil
if maxChunks >= len(chks) {
break
}
}
if iter.Err() != nil {
return nil, errors.Wrap(iter.Err(), "iter TSDB series")
}
if chk != nil {
// Cut the chunk if exists.
chks = append(chks, prompb.Chunk{
MinTimeMs: chkMint,
MaxTimeMs: chkMaxt,
Type: prompb.Chunk_Encoding(chk.Encoding()),
Data: chk.Bytes(),
})
}
return chks, nil
}
// MergeLabels merges two sets of sorted proto labels, preferring those in
// primary to those in secondary when there is an overlap.
func MergeLabels(primary, secondary []prompb.Label) []prompb.Label {
result := make([]prompb.Label, 0, len(primary)+len(secondary))
i, j := 0, 0
for i < len(primary) && j < len(secondary) {
if primary[i].Name < secondary[j].Name {
result = append(result, primary[i])
i++
} else if primary[i].Name > secondary[j].Name {
result = append(result, secondary[j])
j++
} else {
result = append(result, primary[i])
i++
j++
}
}
for ; i < len(primary); i++ {
result = append(result, primary[i])
}
for ; j < len(secondary); j++ {
result = append(result, secondary[j])
}
return result
}
type byLabel []storage.Series type byLabel []storage.Series
func (a byLabel) Len() int { return len(a) } func (a byLabel) Len() int { return len(a) }
@ -322,7 +453,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error)
return pbMatchers, nil return pbMatchers, nil
} }
func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) { func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers)) result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers { for _, matcher := range matchers {
var mtype labels.MatchType var mtype labels.MatchType

View file

@ -147,12 +147,13 @@ type API struct {
flagsMap map[string]string flagsMap map[string]string
ready func(http.HandlerFunc) http.HandlerFunc ready func(http.HandlerFunc) http.HandlerFunc
db func() TSDBAdmin db func() TSDBAdmin
enableAdmin bool enableAdmin bool
logger log.Logger logger log.Logger
remoteReadSampleLimit int remoteReadSampleLimit int
remoteReadGate *gate.Gate remoteReadMaxChunksInFrame int
CORSOrigin *regexp.Regexp remoteReadGate *gate.Gate
CORSOrigin *regexp.Regexp
} }
func init() { func init() {
@ -175,6 +176,7 @@ func NewAPI(
rr rulesRetriever, rr rulesRetriever,
remoteReadSampleLimit int, remoteReadSampleLimit int,
remoteReadConcurrencyLimit int, remoteReadConcurrencyLimit int,
remoteReadMaxChunksInFrame int,
CORSOrigin *regexp.Regexp, CORSOrigin *regexp.Regexp,
) *API { ) *API {
return &API{ return &API{
@ -183,17 +185,18 @@ func NewAPI(
targetRetriever: tr, targetRetriever: tr,
alertmanagerRetriever: ar, alertmanagerRetriever: ar,
now: time.Now, now: time.Now,
config: configFunc, config: configFunc,
flagsMap: flagsMap, flagsMap: flagsMap,
ready: readyFunc, ready: readyFunc,
db: db, db: db,
enableAdmin: enableAdmin, enableAdmin: enableAdmin,
rulesRetriever: rr, rulesRetriever: rr,
remoteReadSampleLimit: remoteReadSampleLimit, remoteReadSampleLimit: remoteReadSampleLimit,
remoteReadGate: gate.New(remoteReadConcurrencyLimit), remoteReadGate: gate.New(remoteReadConcurrencyLimit),
logger: logger, remoteReadMaxChunksInFrame: remoteReadMaxChunksInFrame,
CORSOrigin: CORSOrigin, logger: logger,
CORSOrigin: CORSOrigin,
} }
} }
@ -840,8 +843,23 @@ func (api *API) serveFlags(r *http.Request) apiFuncResult {
return apiFuncResult{api.flagsMap, nil, nil, nil} return apiFuncResult{api.flagsMap, nil, nil, nil}
} }
// negotiateResponseType returns first accepted response type that this server supports.
func negotiateResponseType(accepted []prompb.ReadRequest_ResponseType) prompb.ReadRequest_ResponseType {
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
}
for _, resType := range accepted {
if _, ok := supported[resType]; ok {
return resType
}
}
return -1
}
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
if err := api.remoteReadGate.Start(r.Context()); err != nil { ctx := r.Context()
if err := api.remoteReadGate.Start(ctx); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
@ -856,45 +874,71 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
return return
} }
// Empty req.AcceptedResponseTypes means non streamed, raw samples response. switch negotiateResponseType(req.AcceptedResponseTypes) {
if len(req.AcceptedResponseTypes) > 0 { case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
http.Error(w, fmt.Sprintf("none of requested response types are implemented: %v", req.AcceptedResponseTypes), http.StatusNotImplemented) api.streamedChunkedRemoteRead(ctx, w, req)
return default:
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
api.sampledRemoteRead(ctx, w, req)
}
}
// filterExtLabelsFromMatchers change equality matchers which match external labels
// to a matcher that looks for an empty label,
// as that label should not be present in the storage.
func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) {
matchers, err := remote.FromLabelMatchers(pbMatchers)
if err != nil {
return nil, err
} }
filteredMatchers := make([]*labels.Matcher, 0, len(matchers))
for _, m := range matchers {
value := externalLabels[m.Name]
if m.Type == labels.MatchEqual && value == m.Value {
matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "")
if err != nil {
return nil, err
}
filteredMatchers = append(filteredMatchers, matcher)
} else {
filteredMatchers = append(filteredMatchers, m)
}
}
return filteredMatchers, nil
}
func (api *API) sampledRemoteRead(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest) {
resp := prompb.ReadResponse{ resp := prompb.ReadResponse{
Results: make([]*prompb.QueryResult, len(req.Queries)), Results: make([]*prompb.QueryResult, len(req.Queries)),
} }
externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
for i, query := range req.Queries { for i, query := range req.Queries {
from, through, matchers, selectParams, err := remote.FromQuery(query) filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
querier, err := api.Queryable.Querier(r.Context(), from, through)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
defer querier.Close()
// Change equality matchers which match external labels querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
// to a matcher that looks for an empty label, if err != nil {
// as that label should not be present in the storage. http.Error(w, err.Error(), http.StatusInternalServerError)
externalLabels := api.config().GlobalConfig.ExternalLabels.Map() return
filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) }
for _, m := range matchers { defer func() {
value := externalLabels[m.Name] if err := querier.Close(); err != nil {
if m.Type == labels.MatchEqual && value == m.Value { level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") }
if err != nil { }()
http.Error(w, err.Error(), http.StatusInternalServerError)
return var selectParams *storage.SelectParams
} if query.Hints != nil {
filteredMatchers = append(filteredMatchers, matcher) selectParams = &storage.SelectParams{
} else { Start: query.Hints.StartMs,
filteredMatchers = append(filteredMatchers, m) End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
} }
} }
@ -926,7 +970,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
}) })
for _, ts := range resp.Results[i].Timeseries { for _, ts := range resp.Results[i].Timeseries {
ts.Labels = mergeLabels(ts.Labels, sortedExternalLabels) ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
} }
} }
@ -936,6 +980,82 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
} }
} }
func (api *API) streamedChunkedRemoteRead(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest) {
externalLabels := api.config().GlobalConfig.ExternalLabels.Map()
for i, query := range req.Queries {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
}
}()
var selectParams *storage.SelectParams
if query.Hints != nil {
selectParams = &storage.SelectParams{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
}
}
// TODO(bwplotka): Change interface / find a way to select chunks.
set, _, err := querier.Select(selectParams, filteredMatchers...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Add external labels back in, in sorted order.
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
Name: string(name),
Value: string(value),
})
}
sort.Slice(sortedExternalLabels, func(i, j int) bool {
return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name
})
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
// TODO(bwplotka): Should we use snappy? benchmark to see.
// w.Header().Set("Content-Encoding", "snappy")
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
return
}
if err := remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
int64(i),
set,
sortedExternalLabels,
api.remoteReadMaxChunksInFrame,
); err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func (api *API) deleteSeries(r *http.Request) apiFuncResult { func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}
@ -1073,33 +1193,6 @@ func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher {
panic("storage.convertMatcher: invalid matcher type") panic("storage.convertMatcher: invalid matcher type")
} }
// mergeLabels merges two sets of sorted proto labels, preferring those in
// primary to those in secondary when there is an overlap.
func mergeLabels(primary, secondary []prompb.Label) []prompb.Label {
result := make([]prompb.Label, 0, len(primary)+len(secondary))
i, j := 0, 0
for i < len(primary) && j < len(secondary) {
if primary[i].Name < secondary[j].Name {
result = append(result, primary[i])
i++
} else if primary[i].Name > secondary[j].Name {
result = append(result, secondary[j])
j++
} else {
result = append(result, primary[i])
i++
j++
}
}
for ; i < len(primary); i++ {
result = append(result, primary[i])
}
for ; j < len(secondary); j++ {
result = append(result, secondary[j])
}
return result
}
func (api *API) respond(w http.ResponseWriter, data interface{}, warnings storage.Warnings) { func (api *API) respond(w http.ResponseWriter, data interface{}, warnings storage.Warnings) {
statusMessage := statusSuccess statusMessage := statusSuccess
var warningStrings []string var warningStrings []string

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math" "math"
"net/http" "net/http"
@ -381,13 +382,23 @@ func setupRemote(s storage.Storage) *httptest.Server {
Results: make([]*prompb.QueryResult, len(req.Queries)), Results: make([]*prompb.QueryResult, len(req.Queries)),
} }
for i, query := range req.Queries { for i, query := range req.Queries {
from, through, matchers, selectParams, err := remote.FromQuery(query) matchers, err := remote.FromLabelMatchers(query.Matchers)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
querier, err := s.Querier(r.Context(), from, through) var selectParams *storage.SelectParams
if query.Hints != nil {
selectParams = &storage.SelectParams{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
}
}
querier, err := s.Querier(r.Context(), query.StartTimestampMs, query.EndTimestampMs)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -916,19 +927,17 @@ func assertAPIResponse(t *testing.T, got interface{}, exp interface{}) {
} }
} }
func TestReadEndpoint(t *testing.T) { func TestSampledReadEndpoint(t *testing.T) {
suite, err := promql.NewTest(t, ` suite, err := promql.NewTest(t, `
load 1m load 1m
test_metric1{foo="bar",baz="qux"} 1 test_metric1{foo="bar",baz="qux"} 1
`) `)
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
defer suite.Close() defer suite.Close()
if err := suite.Run(); err != nil { err = suite.Run()
t.Fatal(err) testutil.Ok(t, err)
}
api := &API{ api := &API{
Queryable: suite.Storage(), Queryable: suite.Storage(),
@ -950,27 +959,22 @@ func TestReadEndpoint(t *testing.T) {
// Encode the request. // Encode the request.
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"})
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
data, err := proto.Marshal(req) data, err := proto.Marshal(req)
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
compressed := snappy.Encode(nil, data) compressed := snappy.Encode(nil, data)
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
api.remoteRead(recorder, request) api.remoteRead(recorder, request)
@ -978,28 +982,25 @@ func TestReadEndpoint(t *testing.T) {
t.Fatal(recorder.Code) t.Fatal(recorder.Code)
} }
testutil.Equals(t, "application/x-protobuf", recorder.Result().Header.Get("Content-Type"))
testutil.Equals(t, "snappy", recorder.Result().Header.Get("Content-Encoding"))
// Decode the response. // Decode the response.
compressed, err = ioutil.ReadAll(recorder.Result().Body) compressed, err = ioutil.ReadAll(recorder.Result().Body)
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
uncompressed, err := snappy.Decode(nil, compressed) uncompressed, err := snappy.Decode(nil, compressed)
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
var resp prompb.ReadResponse var resp prompb.ReadResponse
err = proto.Unmarshal(uncompressed, &resp) err = proto.Unmarshal(uncompressed, &resp)
if err != nil { testutil.Ok(t, err)
t.Fatal(err)
}
if len(resp.Results) != 1 { if len(resp.Results) != 1 {
t.Fatalf("Expected 1 result, got %d", len(resp.Results)) t.Fatalf("Expected 1 result, got %d", len(resp.Results))
} }
result := resp.Results[0] testutil.Equals(t, &prompb.QueryResult{
expected := &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{ Timeseries: []*prompb.TimeSeries{
{ {
Labels: []prompb.Label{ Labels: []prompb.Label{
@ -1012,10 +1013,148 @@ func TestReadEndpoint(t *testing.T) {
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}, },
}, },
}, resp.Results[0])
}
// TODO(bwplotka): Extend it with more test cases.
func TestStreamReadEndpoint(t *testing.T) {
suite, err := promql.NewTest(t, `
load 1m
test_metric1{foo="bar",baz="qux"} 1
test_metric1{foo="bar2",baz="qux"} 1
test_metric1{foo="bar3",baz="qux"} 1
`)
testutil.Ok(t, err)
defer suite.Close()
err = suite.Run()
testutil.Ok(t, err)
api := &API{
Queryable: suite.Storage(),
QueryEngine: suite.QueryEngine(),
config: func() config.Config {
return config.Config{
GlobalConfig: config.GlobalConfig{
ExternalLabels: labels.Labels{
{Name: "baz", Value: "a"},
{Name: "b", Value: "c"},
{Name: "d", Value: "e"},
},
},
}
},
remoteReadSampleLimit: 1e6,
remoteReadGate: gate.New(1),
} }
if !reflect.DeepEqual(result, expected) {
t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected) // Encode the request.
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
testutil.Ok(t, err)
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
testutil.Ok(t, err)
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"})
testutil.Ok(t, err)
req := &prompb.ReadRequest{
Queries: []*prompb.Query{query},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
} }
data, err := proto.Marshal(req)
testutil.Ok(t, err)
compressed := snappy.Encode(nil, data)
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
testutil.Ok(t, err)
recorder := httptest.NewRecorder()
api.remoteRead(recorder, request)
if recorder.Code/100 != 2 {
t.Fatal(recorder.Code)
}
testutil.Equals(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type"))
testutil.Equals(t, "", recorder.Result().Header.Get("Content-Encoding"))
var results []*prompb.ChunkedReadResponse
stream := remote.NewChunkedReader(recorder.Result().Body)
for {
res := &prompb.ChunkedReadResponse{}
err := stream.NextProto(res)
if err == io.EOF {
break
}
testutil.Ok(t, err)
results = append(results, res)
}
if len(results) != 3 {
t.Fatalf("Expected 1 result, got %d", len(results))
}
testutil.Equals(t, []*prompb.ChunkedReadResponse{
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Chunks: []prompb.Chunk{
{
Type: prompb.Chunk_XOR,
Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"),
},
},
},
},
},
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar2"},
},
Chunks: []prompb.Chunk{
{
Type: prompb.Chunk_XOR,
Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"),
},
},
},
},
},
{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar3"},
},
Chunks: []prompb.Chunk{
{
Type: prompb.Chunk_XOR,
Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"),
},
},
},
},
},
}, results)
} }
type fakeDB struct { type fakeDB struct {

View file

@ -228,6 +228,7 @@ type Options struct {
PageTitle string PageTitle string
RemoteReadSampleLimit int RemoteReadSampleLimit int
RemoteReadConcurrencyLimit int RemoteReadConcurrencyLimit int
RemoteReadMaxChunksInFrame int
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
Registerer prometheus.Registerer Registerer prometheus.Registerer
@ -291,6 +292,7 @@ func New(logger log.Logger, o *Options) *Handler {
h.ruleManager, h.ruleManager,
h.options.RemoteReadSampleLimit, h.options.RemoteReadSampleLimit,
h.options.RemoteReadConcurrencyLimit, h.options.RemoteReadConcurrencyLimit,
h.options.RemoteReadMaxChunksInFrame,
h.options.CORSOrigin, h.options.CORSOrigin,
) )