diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 464031dc24..a90bb2054b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -213,12 +213,15 @@ func main() { a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) - a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit."). + a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for STREAMED_XOR_CHUNKS response type."). Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit) a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) + a.Flag("storage.remote.read-max-chunks-in-frame", "Maximum number of chunks in single frame for STREAMED_XOR_CHUNKS remote read response type. Each chunk corresponds roughly to (~3B * 120 samples) + 32B. Default is 1000 which is roughly (1000 * ~400B) + labelset, so approx. 0.4MB per frame ."). + Default("1000").IntVar(&cfg.web.RemoteReadMaxChunksInFrame) + a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). Default("1h").SetValue(&cfg.outageTolerance) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index dc4ee28097..6deef17a77 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -24,10 +24,10 @@ import ( "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/tsdb/chunkenc" ) // decodeReadLimit is the maximum size of a read request body in bytes. @@ -106,25 +106,6 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams }, nil } -// FromQuery unpacks a Query proto. -func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, *storage.SelectParams, error) { - matchers, err := fromLabelMatchers(req.Matchers) - if err != nil { - return 0, 0, nil, nil, err - } - var selectParams *storage.SelectParams - if req.Hints != nil { - selectParams = &storage.SelectParams{ - Start: req.Hints.StartMs, - End: req.Hints.EndMs, - Step: req.Hints.StepMs, - Func: req.Hints.Func, - } - } - - return req.StartTimestampMs, req.EndTimestampMs, matchers, selectParams, nil -} - // ToQueryResult builds a QueryResult proto. func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) { numSamples := 0 @@ -183,6 +164,156 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { } } +// StreamChunkedReadResponses iteraties over series, build chunks and streams those to caller. +// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of rencoding everything. +func StreamChunkedReadResponses( + stream io.Writer, + queryIndex int64, + ss storage.SeriesSet, + sortedExternalLabels []prompb.Label, + maxChunksInFrame int, +) error { + var ( + chks = make([]prompb.Chunk, 0, maxChunksInFrame) + err error + ) + + for ss.Next() { + series := ss.At() + iter := series.Iterator() + lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels) + + for { + chks, err = encodeChunks(iter, chks, maxChunksInFrame) + if err != nil { + return err + } + + if len(chks) == 0 { + break + } + + b, err := proto.Marshal(&prompb.ChunkedReadResponse{ + // TODO(bwplotka): Do we really need multiple? + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: lbls, + Chunks: chks, + }, + }, + QueryIndex: queryIndex, + }) + if err != nil { + return errors.Wrap(err, "marshal ChunkedReadResponse") + } + + if _, err := stream.Write(b); err != nil { + return errors.Wrap(err, "write to stream") + } + + chks = chks[:0] + } + + if err := iter.Err(); err != nil { + return err + } + } + if err := ss.Err(); err != nil { + return err + } + + return nil +} + +// encodeChunks expects iterator to be ready to use (aka iter.Next() done before invoking). +func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks int) ([]prompb.Chunk, error) { + const maxSamplesInChunk = 120 + + var ( + chkMint int64 + chkMaxt int64 + chk *chunkenc.XORChunk + app chunkenc.Appender + err error + + numSamples = 0 + ) + + for iter.Next() { + numSamples++ + + if chk == nil { + chk = chunkenc.NewXORChunk() + app, err = chk.Appender() + if err != nil { + return nil, err + } + chkMint, _ = iter.At() + } + + app.Append(iter.At()) + chkMaxt, _ = iter.At() + + if chk.NumSamples() < maxSamplesInChunk { + continue + } + + // Cut the chunk. + chks = append(chks, prompb.Chunk{ + MinTimeMs: chkMint, + MaxTimeMs: chkMaxt, + Type: prompb.Chunk_Encoding(chk.Encoding()), + Data: chk.Bytes(), + }) + chk = nil + + if maxChunks >= len(chks) { + break + } + } + if iter.Err() != nil { + return nil, errors.Wrap(iter.Err(), "iter TSDB series") + } + + if chk != nil { + // Cut the chunk if exists. + chks = append(chks, prompb.Chunk{ + MinTimeMs: chkMint, + MaxTimeMs: chkMaxt, + Type: prompb.Chunk_Encoding(chk.Encoding()), + Data: chk.Bytes(), + }) + } + return chks, nil +} + +// MergeLabels merges two sets of sorted proto labels, preferring those in +// primary to those in secondary when there is an overlap. +func MergeLabels(primary, secondary []prompb.Label) []prompb.Label { + result := make([]prompb.Label, 0, len(primary)+len(secondary)) + i, j := 0, 0 + for i < len(primary) && j < len(secondary) { + if primary[i].Name < secondary[j].Name { + result = append(result, primary[i]) + i++ + } else if primary[i].Name > secondary[j].Name { + result = append(result, secondary[j]) + j++ + } else { + result = append(result, primary[i]) + i++ + j++ + } + } + for ; i < len(primary); i++ { + result = append(result, primary[i]) + } + for ; j < len(secondary); j++ { + result = append(result, secondary[j]) + } + return result +} + type byLabel []storage.Series func (a byLabel) Len() int { return len(a) } @@ -322,7 +453,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) return pbMatchers, nil } -func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) { +func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) { result := make([]*labels.Matcher, 0, len(matchers)) for _, matcher := range matchers { var mtype labels.MatchType diff --git a/web/api/v1/api.go b/web/api/v1/api.go index d28bedaa95..e338968c97 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -147,12 +147,13 @@ type API struct { flagsMap map[string]string ready func(http.HandlerFunc) http.HandlerFunc - db func() TSDBAdmin - enableAdmin bool - logger log.Logger - remoteReadSampleLimit int - remoteReadGate *gate.Gate - CORSOrigin *regexp.Regexp + db func() TSDBAdmin + enableAdmin bool + logger log.Logger + remoteReadSampleLimit int + remoteReadMaxChunksInFrame int + remoteReadGate *gate.Gate + CORSOrigin *regexp.Regexp } func init() { @@ -175,6 +176,7 @@ func NewAPI( rr rulesRetriever, remoteReadSampleLimit int, remoteReadConcurrencyLimit int, + remoteReadMaxChunksInFrame int, CORSOrigin *regexp.Regexp, ) *API { return &API{ @@ -183,17 +185,18 @@ func NewAPI( targetRetriever: tr, alertmanagerRetriever: ar, - now: time.Now, - config: configFunc, - flagsMap: flagsMap, - ready: readyFunc, - db: db, - enableAdmin: enableAdmin, - rulesRetriever: rr, - remoteReadSampleLimit: remoteReadSampleLimit, - remoteReadGate: gate.New(remoteReadConcurrencyLimit), - logger: logger, - CORSOrigin: CORSOrigin, + now: time.Now, + config: configFunc, + flagsMap: flagsMap, + ready: readyFunc, + db: db, + enableAdmin: enableAdmin, + rulesRetriever: rr, + remoteReadSampleLimit: remoteReadSampleLimit, + remoteReadGate: gate.New(remoteReadConcurrencyLimit), + remoteReadMaxChunksInFrame: remoteReadMaxChunksInFrame, + logger: logger, + CORSOrigin: CORSOrigin, } } @@ -840,8 +843,23 @@ func (api *API) serveFlags(r *http.Request) apiFuncResult { return apiFuncResult{api.flagsMap, nil, nil, nil} } +// negotiateResponseType returns first accepted response type that this server supports. +func negotiateResponseType(accepted []prompb.ReadRequest_ResponseType) prompb.ReadRequest_ResponseType { + supported := map[prompb.ReadRequest_ResponseType]struct{}{ + prompb.ReadRequest_STREAMED_XOR_CHUNKS: {}, + } + + for _, resType := range accepted { + if _, ok := supported[resType]; ok { + return resType + } + } + return -1 +} + func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { - if err := api.remoteReadGate.Start(r.Context()); err != nil { + ctx := r.Context() + if err := api.remoteReadGate.Start(ctx); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -856,45 +874,71 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } - // Empty req.AcceptedResponseTypes means non streamed, raw samples response. - if len(req.AcceptedResponseTypes) > 0 { - http.Error(w, fmt.Sprintf("none of requested response types are implemented: %v", req.AcceptedResponseTypes), http.StatusNotImplemented) - return + switch negotiateResponseType(req.AcceptedResponseTypes) { + case prompb.ReadRequest_STREAMED_XOR_CHUNKS: + api.streamedChunkedRemoteRead(ctx, w, req) + default: + // On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response. + api.sampledRemoteRead(ctx, w, req) + } +} + +// filterExtLabelsFromMatchers change equality matchers which match external labels +// to a matcher that looks for an empty label, +// as that label should not be present in the storage. +func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) { + matchers, err := remote.FromLabelMatchers(pbMatchers) + if err != nil { + return nil, err } + filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) + for _, m := range matchers { + value := externalLabels[m.Name] + if m.Type == labels.MatchEqual && value == m.Value { + matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") + if err != nil { + return nil, err + } + filteredMatchers = append(filteredMatchers, matcher) + } else { + filteredMatchers = append(filteredMatchers, m) + } + } + + return filteredMatchers, nil +} + +func (api *API) sampledRemoteRead(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest) { resp := prompb.ReadResponse{ Results: make([]*prompb.QueryResult, len(req.Queries)), } + externalLabels := api.config().GlobalConfig.ExternalLabels.Map() for i, query := range req.Queries { - from, through, matchers, selectParams, err := remote.FromQuery(query) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - querier, err := api.Queryable.Querier(r.Context(), from, through) + filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - defer querier.Close() - // Change equality matchers which match external labels - // to a matcher that looks for an empty label, - // as that label should not be present in the storage. - externalLabels := api.config().GlobalConfig.ExternalLabels.Map() - filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) - for _, m := range matchers { - value := externalLabels[m.Name] - if m.Type == labels.MatchEqual && value == m.Value { - matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - filteredMatchers = append(filteredMatchers, matcher) - } else { - filteredMatchers = append(filteredMatchers, m) + querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) + } + }() + + var selectParams *storage.SelectParams + if query.Hints != nil { + selectParams = &storage.SelectParams{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, } } @@ -926,7 +970,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { }) for _, ts := range resp.Results[i].Timeseries { - ts.Labels = mergeLabels(ts.Labels, sortedExternalLabels) + ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels) } } @@ -936,6 +980,82 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } +func (api *API) streamedChunkedRemoteRead(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest) { + externalLabels := api.config().GlobalConfig.ExternalLabels.Map() + for i, query := range req.Queries { + filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) + } + }() + + var selectParams *storage.SelectParams + if query.Hints != nil { + selectParams = &storage.SelectParams{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + } + } + + // TODO(bwplotka): Change interface / find a way to select chunks. + set, _, err := querier.Select(selectParams, filteredMatchers...) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Add external labels back in, in sorted order. + sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) + for name, value := range externalLabels { + sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ + Name: string(name), + Value: string(value), + }) + } + sort.Slice(sortedExternalLabels, func(i, j int) bool { + return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name + }) + + w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + // TODO(bwplotka): Should we use snappy? benchmark to see. + // w.Header().Set("Content-Encoding", "snappy") + + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError) + return + } + + if err := remote.StreamChunkedReadResponses( + remote.NewChunkedWriter(w, f), + int64(i), + set, + sortedExternalLabels, + api.remoteReadMaxChunksInFrame, + ); err != nil { + if httpErr, ok := err.(remote.HTTPError); ok { + http.Error(w, httpErr.Error(), httpErr.Status()) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} @@ -1073,33 +1193,6 @@ func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { panic("storage.convertMatcher: invalid matcher type") } -// mergeLabels merges two sets of sorted proto labels, preferring those in -// primary to those in secondary when there is an overlap. -func mergeLabels(primary, secondary []prompb.Label) []prompb.Label { - result := make([]prompb.Label, 0, len(primary)+len(secondary)) - i, j := 0, 0 - for i < len(primary) && j < len(secondary) { - if primary[i].Name < secondary[j].Name { - result = append(result, primary[i]) - i++ - } else if primary[i].Name > secondary[j].Name { - result = append(result, secondary[j]) - j++ - } else { - result = append(result, primary[i]) - i++ - j++ - } - } - for ; i < len(primary); i++ { - result = append(result, primary[i]) - } - for ; j < len(secondary); j++ { - result = append(result, secondary[j]) - } - return result -} - func (api *API) respond(w http.ResponseWriter, data interface{}, warnings storage.Warnings) { statusMessage := statusSuccess var warningStrings []string diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index fe9f8cf318..d56583cdf0 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "math" "net/http" @@ -381,13 +382,23 @@ func setupRemote(s storage.Storage) *httptest.Server { Results: make([]*prompb.QueryResult, len(req.Queries)), } for i, query := range req.Queries { - from, through, matchers, selectParams, err := remote.FromQuery(query) + matchers, err := remote.FromLabelMatchers(query.Matchers) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - querier, err := s.Querier(r.Context(), from, through) + var selectParams *storage.SelectParams + if query.Hints != nil { + selectParams = &storage.SelectParams{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + } + } + + querier, err := s.Querier(r.Context(), query.StartTimestampMs, query.EndTimestampMs) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -916,19 +927,17 @@ func assertAPIResponse(t *testing.T, got interface{}, exp interface{}) { } } -func TestReadEndpoint(t *testing.T) { +func TestSampledReadEndpoint(t *testing.T) { suite, err := promql.NewTest(t, ` load 1m test_metric1{foo="bar",baz="qux"} 1 `) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + defer suite.Close() - if err := suite.Run(); err != nil { - t.Fatal(err) - } + err = suite.Run() + testutil.Ok(t, err) api := &API{ Queryable: suite.Storage(), @@ -950,27 +959,22 @@ func TestReadEndpoint(t *testing.T) { // Encode the request. matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} data, err := proto.Marshal(req) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + compressed := snappy.Encode(nil, data) request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + recorder := httptest.NewRecorder() api.remoteRead(recorder, request) @@ -978,28 +982,25 @@ func TestReadEndpoint(t *testing.T) { t.Fatal(recorder.Code) } + testutil.Equals(t, "application/x-protobuf", recorder.Result().Header.Get("Content-Type")) + testutil.Equals(t, "snappy", recorder.Result().Header.Get("Content-Encoding")) + // Decode the response. compressed, err = ioutil.ReadAll(recorder.Result().Body) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) + uncompressed, err := snappy.Decode(nil, compressed) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) var resp prompb.ReadResponse err = proto.Unmarshal(uncompressed, &resp) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) if len(resp.Results) != 1 { t.Fatalf("Expected 1 result, got %d", len(resp.Results)) } - result := resp.Results[0] - expected := &prompb.QueryResult{ + testutil.Equals(t, &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ { Labels: []prompb.Label{ @@ -1012,10 +1013,148 @@ func TestReadEndpoint(t *testing.T) { Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, }, }, + }, resp.Results[0]) +} + +// TODO(bwplotka): Extend it with more test cases. +func TestStreamReadEndpoint(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar",baz="qux"} 1 + test_metric1{foo="bar2",baz="qux"} 1 + test_metric1{foo="bar3",baz="qux"} 1 + `) + testutil.Ok(t, err) + + defer suite.Close() + + err = suite.Run() + testutil.Ok(t, err) + + api := &API{ + Queryable: suite.Storage(), + QueryEngine: suite.QueryEngine(), + config: func() config.Config { + return config.Config{ + GlobalConfig: config.GlobalConfig{ + ExternalLabels: labels.Labels{ + {Name: "baz", Value: "a"}, + {Name: "b", Value: "c"}, + {Name: "d", Value: "e"}, + }, + }, + } + }, + remoteReadSampleLimit: 1e6, + remoteReadGate: gate.New(1), } - if !reflect.DeepEqual(result, expected) { - t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected) + + // Encode the request. + matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") + testutil.Ok(t, err) + + matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") + testutil.Ok(t, err) + + query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) + testutil.Ok(t, err) + + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{query}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, } + data, err := proto.Marshal(req) + testutil.Ok(t, err) + + compressed := snappy.Encode(nil, data) + request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) + testutil.Ok(t, err) + + recorder := httptest.NewRecorder() + api.remoteRead(recorder, request) + + if recorder.Code/100 != 2 { + t.Fatal(recorder.Code) + } + + testutil.Equals(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type")) + testutil.Equals(t, "", recorder.Result().Header.Get("Content-Encoding")) + + var results []*prompb.ChunkedReadResponse + stream := remote.NewChunkedReader(recorder.Result().Body) + for { + res := &prompb.ChunkedReadResponse{} + err := stream.NextProto(res) + if err == io.EOF { + break + } + testutil.Ok(t, err) + results = append(results, res) + } + + if len(results) != 3 { + t.Fatalf("Expected 1 result, got %d", len(results)) + } + + testutil.Equals(t, []*prompb.ChunkedReadResponse{ + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar2"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*prompb.ChunkedSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar3"}, + }, + Chunks: []prompb.Chunk{ + { + Type: prompb.Chunk_XOR, + Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"), + }, + }, + }, + }, + }, + }, results) } type fakeDB struct { diff --git a/web/web.go b/web/web.go index 1cc4736fd2..a994781691 100644 --- a/web/web.go +++ b/web/web.go @@ -228,6 +228,7 @@ type Options struct { PageTitle string RemoteReadSampleLimit int RemoteReadConcurrencyLimit int + RemoteReadMaxChunksInFrame int Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -291,6 +292,7 @@ func New(logger log.Logger, o *Options) *Handler { h.ruleManager, h.options.RemoteReadSampleLimit, h.options.RemoteReadConcurrencyLimit, + h.options.RemoteReadMaxChunksInFrame, h.options.CORSOrigin, )