diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b043f929eb..1c78ce8dea 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -219,8 +219,8 @@ func main() { a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) - a.Flag("storage.remote.read-max-chunks-in-frame", "Maximum number of chunks in single frame for STREAMED_XOR_CHUNKS remote read response type. Each chunk corresponds roughly to (~3B * 120 samples) + 32B. Default is 1000 which is roughly (1000 * ~400B) + labelset, so approx. 0.4MB per frame. Be aware that client might have limit on frame size as well."). - Default("1000").IntVar(&cfg.web.RemoteReadMaxChunksInFrame) + a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well."). + Default("1000").IntVar(&cfg.web.RemoteReadBytesInFrame) a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). Default("1h").SetValue(&cfg.outageTolerance) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 5a55e51a58..21b78cbb85 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -188,11 +188,12 @@ func StreamChunkedReadResponses( queryIndex int64, ss storage.SeriesSet, sortedExternalLabels []prompb.Label, - maxChunksInFrame int, + maxBytesInFrame int, ) error { var ( - chks = make([]prompb.Chunk, 0, maxChunksInFrame) - err error + chks []prompb.Chunk + err error + lblsSize int ) for ss.Next() { @@ -200,10 +201,15 @@ func StreamChunkedReadResponses( iter := series.Iterator() lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels) - // Send at most one series per frame; series may be split over multiple frames according to maxChunksInFrame. + lblsSize = 0 + for _, lbl := range lbls { + lblsSize += lbl.Size() + } + + // Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for { // TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/tsdb/pull/665 - chks, err = encodeChunks(iter, chks, maxChunksInFrame) + chks, err = encodeChunks(iter, chks, maxBytesInFrame-lblsSize) if err != nil { return err } @@ -244,7 +250,7 @@ func StreamChunkedReadResponses( } // encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking). -func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks int) ([]prompb.Chunk, error) { +func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, frameBytesLeft int) ([]prompb.Chunk, error) { const maxSamplesInChunk = 120 var ( @@ -280,8 +286,10 @@ func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks in Data: chk.Bytes(), }) chk = nil + frameBytesLeft -= chks[len(chks)-1].Size() - if len(chks) >= maxChunks { + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + if frameBytesLeft <= 0 { break } } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 6637a39d2e..252ce69c3f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -147,13 +147,13 @@ type API struct { flagsMap map[string]string ready func(http.HandlerFunc) http.HandlerFunc - db func() TSDBAdmin - enableAdmin bool - logger log.Logger - remoteReadSampleLimit int - remoteReadMaxChunksInFrame int - remoteReadGate *gate.Gate - CORSOrigin *regexp.Regexp + db func() TSDBAdmin + enableAdmin bool + logger log.Logger + remoteReadSampleLimit int + remoteReadMaxBytesInFrame int + remoteReadGate *gate.Gate + CORSOrigin *regexp.Regexp } func init() { @@ -176,7 +176,7 @@ func NewAPI( rr rulesRetriever, remoteReadSampleLimit int, remoteReadConcurrencyLimit int, - remoteReadMaxChunksInFrame int, + remoteReadMaxBytesInFrame int, CORSOrigin *regexp.Regexp, ) *API { return &API{ @@ -185,18 +185,18 @@ func NewAPI( targetRetriever: tr, alertmanagerRetriever: ar, - now: time.Now, - config: configFunc, - flagsMap: flagsMap, - ready: readyFunc, - db: db, - enableAdmin: enableAdmin, - rulesRetriever: rr, - remoteReadSampleLimit: remoteReadSampleLimit, - remoteReadGate: gate.New(remoteReadConcurrencyLimit), - remoteReadMaxChunksInFrame: remoteReadMaxChunksInFrame, - logger: logger, - CORSOrigin: CORSOrigin, + now: time.Now, + config: configFunc, + flagsMap: flagsMap, + ready: readyFunc, + db: db, + enableAdmin: enableAdmin, + rulesRetriever: rr, + remoteReadSampleLimit: remoteReadSampleLimit, + remoteReadGate: gate.New(remoteReadConcurrencyLimit), + remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame, + logger: logger, + CORSOrigin: CORSOrigin, } } @@ -896,7 +896,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { int64(i), set, sortedExternalLabels, - api.remoteReadMaxChunksInFrame, + api.remoteReadMaxBytesInFrame, ) }) if err != nil { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index e1f3b9dee8..642943144e 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1017,13 +1017,13 @@ func TestSampledReadEndpoint(t *testing.T) { } func TestStreamReadEndpoint(t *testing.T) { - // 3 series. - // First with one sample. We expect 1 frame with 1 chunk. + // 3 series each series. + // First with 119 samples. We expect 1 frame with 1 chunk. // Second with 121 samples, We expect 1 frame with 2 chunks. - // Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series. + // Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit. suite, err := promql.NewTest(t, ` load 1m - test_metric1{foo="bar",baz="qux"} 1 + test_metric1{foo="bar1",baz="qux"} 0+100x119 test_metric1{foo="bar2",baz="qux"} 0+100x120 test_metric1{foo="bar3",baz="qux"} 0+100x240 `) @@ -1047,9 +1047,10 @@ func TestStreamReadEndpoint(t *testing.T) { }, } }, - remoteReadSampleLimit: 1e6, - remoteReadGate: gate.New(1), - remoteReadMaxChunksInFrame: 2, + remoteReadSampleLimit: 1e6, + remoteReadGate: gate.New(1), + // Labelset has 57 bytes. Full chunk in test data has roughly 240 bytes. This allows us to have at max 2 chunks in this test. + remoteReadMaxBytesInFrame: 57 + 480, } // Encode the request. @@ -1108,12 +1109,13 @@ func TestStreamReadEndpoint(t *testing.T) { {Name: "b", Value: "c"}, {Name: "baz", Value: "qux"}, {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar"}, + {Name: "foo", Value: "bar1"}, }, Chunks: []prompb.Chunk{ { - Type: prompb.Chunk_XOR, - Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"), + Type: prompb.Chunk_XOR, + MaxTimeMs: 7140000, + Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"), }, }, }, diff --git a/web/web.go b/web/web.go index a994781691..6d956a7dc6 100644 --- a/web/web.go +++ b/web/web.go @@ -228,7 +228,7 @@ type Options struct { PageTitle string RemoteReadSampleLimit int RemoteReadConcurrencyLimit int - RemoteReadMaxChunksInFrame int + RemoteReadBytesInFrame int Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -292,7 +292,7 @@ func New(logger log.Logger, o *Options) *Handler { h.ruleManager, h.options.RemoteReadSampleLimit, h.options.RemoteReadConcurrencyLimit, - h.options.RemoteReadMaxChunksInFrame, + h.options.RemoteReadBytesInFrame, h.options.CORSOrigin, )