Moved to bytes limit.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2019-07-30 14:14:36 +01:00
parent 12e0a2ede3
commit b46c0f32aa
5 changed files with 52 additions and 42 deletions

View file

@ -219,8 +219,8 @@ func main() {
a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit.").
Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit)
a.Flag("storage.remote.read-max-chunks-in-frame", "Maximum number of chunks in single frame for STREAMED_XOR_CHUNKS remote read response type. Each chunk corresponds roughly to (~3B * 120 samples) + 32B. Default is 1000 which is roughly (1000 * ~400B) + labelset, so approx. 0.4MB per frame. Be aware that client might have limit on frame size as well."). a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well.").
Default("1000").IntVar(&cfg.web.RemoteReadMaxChunksInFrame) Default("1000").IntVar(&cfg.web.RemoteReadBytesInFrame)
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert.").
Default("1h").SetValue(&cfg.outageTolerance) Default("1h").SetValue(&cfg.outageTolerance)

View file

@ -188,11 +188,12 @@ func StreamChunkedReadResponses(
queryIndex int64, queryIndex int64,
ss storage.SeriesSet, ss storage.SeriesSet,
sortedExternalLabels []prompb.Label, sortedExternalLabels []prompb.Label,
maxChunksInFrame int, maxBytesInFrame int,
) error { ) error {
var ( var (
chks = make([]prompb.Chunk, 0, maxChunksInFrame) chks []prompb.Chunk
err error err error
lblsSize int
) )
for ss.Next() { for ss.Next() {
@ -200,10 +201,15 @@ func StreamChunkedReadResponses(
iter := series.Iterator() iter := series.Iterator()
lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels) lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels)
// Send at most one series per frame; series may be split over multiple frames according to maxChunksInFrame. lblsSize = 0
for _, lbl := range lbls {
lblsSize += lbl.Size()
}
// Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for { for {
// TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/tsdb/pull/665 // TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/tsdb/pull/665
chks, err = encodeChunks(iter, chks, maxChunksInFrame) chks, err = encodeChunks(iter, chks, maxBytesInFrame-lblsSize)
if err != nil { if err != nil {
return err return err
} }
@ -244,7 +250,7 @@ func StreamChunkedReadResponses(
} }
// encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking). // encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking).
func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks int) ([]prompb.Chunk, error) { func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, frameBytesLeft int) ([]prompb.Chunk, error) {
const maxSamplesInChunk = 120 const maxSamplesInChunk = 120
var ( var (
@ -280,8 +286,10 @@ func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks in
Data: chk.Bytes(), Data: chk.Bytes(),
}) })
chk = nil chk = nil
frameBytesLeft -= chks[len(chks)-1].Size()
if len(chks) >= maxChunks { // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
if frameBytesLeft <= 0 {
break break
} }
} }

View file

@ -147,13 +147,13 @@ type API struct {
flagsMap map[string]string flagsMap map[string]string
ready func(http.HandlerFunc) http.HandlerFunc ready func(http.HandlerFunc) http.HandlerFunc
db func() TSDBAdmin db func() TSDBAdmin
enableAdmin bool enableAdmin bool
logger log.Logger logger log.Logger
remoteReadSampleLimit int remoteReadSampleLimit int
remoteReadMaxChunksInFrame int remoteReadMaxBytesInFrame int
remoteReadGate *gate.Gate remoteReadGate *gate.Gate
CORSOrigin *regexp.Regexp CORSOrigin *regexp.Regexp
} }
func init() { func init() {
@ -176,7 +176,7 @@ func NewAPI(
rr rulesRetriever, rr rulesRetriever,
remoteReadSampleLimit int, remoteReadSampleLimit int,
remoteReadConcurrencyLimit int, remoteReadConcurrencyLimit int,
remoteReadMaxChunksInFrame int, remoteReadMaxBytesInFrame int,
CORSOrigin *regexp.Regexp, CORSOrigin *regexp.Regexp,
) *API { ) *API {
return &API{ return &API{
@ -185,18 +185,18 @@ func NewAPI(
targetRetriever: tr, targetRetriever: tr,
alertmanagerRetriever: ar, alertmanagerRetriever: ar,
now: time.Now, now: time.Now,
config: configFunc, config: configFunc,
flagsMap: flagsMap, flagsMap: flagsMap,
ready: readyFunc, ready: readyFunc,
db: db, db: db,
enableAdmin: enableAdmin, enableAdmin: enableAdmin,
rulesRetriever: rr, rulesRetriever: rr,
remoteReadSampleLimit: remoteReadSampleLimit, remoteReadSampleLimit: remoteReadSampleLimit,
remoteReadGate: gate.New(remoteReadConcurrencyLimit), remoteReadGate: gate.New(remoteReadConcurrencyLimit),
remoteReadMaxChunksInFrame: remoteReadMaxChunksInFrame, remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
logger: logger, logger: logger,
CORSOrigin: CORSOrigin, CORSOrigin: CORSOrigin,
} }
} }
@ -896,7 +896,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
int64(i), int64(i),
set, set,
sortedExternalLabels, sortedExternalLabels,
api.remoteReadMaxChunksInFrame, api.remoteReadMaxBytesInFrame,
) )
}) })
if err != nil { if err != nil {

View file

@ -1017,13 +1017,13 @@ func TestSampledReadEndpoint(t *testing.T) {
} }
func TestStreamReadEndpoint(t *testing.T) { func TestStreamReadEndpoint(t *testing.T) {
// 3 series. // 3 series each series.
// First with one sample. We expect 1 frame with 1 chunk. // First with 119 samples. We expect 1 frame with 1 chunk.
// Second with 121 samples, We expect 1 frame with 2 chunks. // Second with 121 samples, We expect 1 frame with 2 chunks.
// Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series. // Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit.
suite, err := promql.NewTest(t, ` suite, err := promql.NewTest(t, `
load 1m load 1m
test_metric1{foo="bar",baz="qux"} 1 test_metric1{foo="bar1",baz="qux"} 0+100x119
test_metric1{foo="bar2",baz="qux"} 0+100x120 test_metric1{foo="bar2",baz="qux"} 0+100x120
test_metric1{foo="bar3",baz="qux"} 0+100x240 test_metric1{foo="bar3",baz="qux"} 0+100x240
`) `)
@ -1047,9 +1047,10 @@ func TestStreamReadEndpoint(t *testing.T) {
}, },
} }
}, },
remoteReadSampleLimit: 1e6, remoteReadSampleLimit: 1e6,
remoteReadGate: gate.New(1), remoteReadGate: gate.New(1),
remoteReadMaxChunksInFrame: 2, // Labelset has 57 bytes. Full chunk in test data has roughly 240 bytes. This allows us to have at max 2 chunks in this test.
remoteReadMaxBytesInFrame: 57 + 480,
} }
// Encode the request. // Encode the request.
@ -1108,12 +1109,13 @@ func TestStreamReadEndpoint(t *testing.T) {
{Name: "b", Value: "c"}, {Name: "b", Value: "c"},
{Name: "baz", Value: "qux"}, {Name: "baz", Value: "qux"},
{Name: "d", Value: "e"}, {Name: "d", Value: "e"},
{Name: "foo", Value: "bar"}, {Name: "foo", Value: "bar1"},
}, },
Chunks: []prompb.Chunk{ Chunks: []prompb.Chunk{
{ {
Type: prompb.Chunk_XOR, Type: prompb.Chunk_XOR,
Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"), MaxTimeMs: 7140000,
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
}, },
}, },
}, },

View file

@ -228,7 +228,7 @@ type Options struct {
PageTitle string PageTitle string
RemoteReadSampleLimit int RemoteReadSampleLimit int
RemoteReadConcurrencyLimit int RemoteReadConcurrencyLimit int
RemoteReadMaxChunksInFrame int RemoteReadBytesInFrame int
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
Registerer prometheus.Registerer Registerer prometheus.Registerer
@ -292,7 +292,7 @@ func New(logger log.Logger, o *Options) *Handler {
h.ruleManager, h.ruleManager,
h.options.RemoteReadSampleLimit, h.options.RemoteReadSampleLimit,
h.options.RemoteReadConcurrencyLimit, h.options.RemoteReadConcurrencyLimit,
h.options.RemoteReadMaxChunksInFrame, h.options.RemoteReadBytesInFrame,
h.options.CORSOrigin, h.options.CORSOrigin,
) )