mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Fixed maxChunks bug and extended unit test.
Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
f02a261624
commit
c4ebc94e14
|
@ -183,6 +183,8 @@ func StreamChunkedReadResponses(
|
|||
iter := series.Iterator()
|
||||
lbls := MergeLabels(labelsToLabelsProto(series.Labels()), sortedExternalLabels)
|
||||
|
||||
// TODO(bwplotka): We send each series in separate frame no matter what. Even if series has only one sample.
|
||||
// I think we should pack strictly based on number chunks notnecessarilyy from the same series. Thoughts?
|
||||
for {
|
||||
chks, err = encodeChunks(iter, chks, maxChunksInFrame)
|
||||
if err != nil {
|
||||
|
@ -235,13 +237,9 @@ func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks in
|
|||
chk *chunkenc.XORChunk
|
||||
app chunkenc.Appender
|
||||
err error
|
||||
|
||||
numSamples = 0
|
||||
)
|
||||
|
||||
for iter.Next() {
|
||||
numSamples++
|
||||
|
||||
if chk == nil {
|
||||
chk = chunkenc.NewXORChunk()
|
||||
app, err = chk.Appender()
|
||||
|
@ -267,7 +265,7 @@ func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, maxChunks in
|
|||
})
|
||||
chk = nil
|
||||
|
||||
if maxChunks >= len(chks) {
|
||||
if len(chks) >= maxChunks {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1016,20 +1016,22 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
}, resp.Results[0])
|
||||
}
|
||||
|
||||
// TODO(bwplotka): Extend it with more test cases.
|
||||
func TestStreamReadEndpoint(t *testing.T) {
|
||||
// 3 series.
|
||||
// First with one sample. We expect 1 frame with 1 chunk.
|
||||
// Second with 121 samples, We expect 1 frame with 2 chunks.
|
||||
// Third with 241 samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series.
|
||||
suite, err := promql.NewTest(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar",baz="qux"} 1
|
||||
test_metric1{foo="bar2",baz="qux"} 1
|
||||
test_metric1{foo="bar3",baz="qux"} 1
|
||||
test_metric1{foo="bar2",baz="qux"} 0+100x120
|
||||
test_metric1{foo="bar3",baz="qux"} 0+100x240
|
||||
`)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
defer suite.Close()
|
||||
|
||||
err = suite.Run()
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, suite.Run())
|
||||
|
||||
api := &API{
|
||||
Queryable: suite.Storage(),
|
||||
|
@ -1045,8 +1047,9 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
},
|
||||
}
|
||||
},
|
||||
remoteReadSampleLimit: 1e6,
|
||||
remoteReadGate: gate.New(1),
|
||||
remoteReadSampleLimit: 1e6,
|
||||
remoteReadGate: gate.New(1),
|
||||
remoteReadMaxChunksInFrame: 2,
|
||||
}
|
||||
|
||||
// Encode the request.
|
||||
|
@ -1056,7 +1059,7 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"})
|
||||
query, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"})
|
||||
testutil.Ok(t, err)
|
||||
|
||||
req := &prompb.ReadRequest{
|
||||
|
@ -1092,8 +1095,8 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
results = append(results, res)
|
||||
}
|
||||
|
||||
if len(results) != 3 {
|
||||
t.Fatalf("Expected 1 result, got %d", len(results))
|
||||
if len(results) != 4 {
|
||||
t.Fatalf("Expected 4 result, got %d", len(results))
|
||||
}
|
||||
|
||||
testutil.Equals(t, []*prompb.ChunkedReadResponse{
|
||||
|
@ -1128,8 +1131,15 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"),
|
||||
Type: prompb.Chunk_XOR,
|
||||
MaxTimeMs: 7140000,
|
||||
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
||||
},
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MinTimeMs: 7200000,
|
||||
MaxTimeMs: 7200000,
|
||||
Data: []byte("\000\001\200\364\356\006@\307p\000\000\000\000\000\000"),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1147,8 +1157,36 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
Data: []byte("\000\001\000?\360\000\000\000\000\000\000\000"),
|
||||
Type: prompb.Chunk_XOR,
|
||||
MaxTimeMs: 7140000,
|
||||
Data: []byte("\000x\000\000\000\000\000\000\000\000\000\340\324\003\302|\005\224\000\301\254}\351z2\320O\355\264n[\007\316\224\243md\371\320\375\032Pm\nS\235\016Q\255\006P\275\250\277\312\201Z\003(3\240R\207\332\005(\017\240\322\201\332=(\023\2402\203Z\007(w\2402\201Z\017(\023\265\227\364P\033@\245\007\364\nP\033C\245\002t\036P+@e\036\364\016Pk@e\002t:P;A\245\001\364\nS\373@\245\006t\006P+C\345\002\364\006Pk@\345\036t\nP\033A\245\003\364:P\033@\245\006t\016ZJ\377\\\205\313\210\327\270\017\345+F[\310\347E)\355\024\241\366\342}(v\215(N\203)\326\207(\336\203(V\332W\362\202t4\240m\005(\377AJ\006\320\322\202t\374\240\255\003(oA\312:\3202"),
|
||||
},
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MinTimeMs: 7200000,
|
||||
MaxTimeMs: 14340000,
|
||||
Data: []byte("\000x\200\364\356\006@\307p\000\000\000\000\000\340\324\003\340>\224\355\260\277\322\200\372\005(=\240R\207:\003(\025\240\362\201z\003(\365\240r\203:\005(\r\241\322\201\372\r(\r\240R\237:\007(5\2402\201z\037(\025\2402\203:\005(\375\240R\200\372\r(\035\241\322\201:\003(5\240r\326g\364\271\213\227!\253q\037\312N\340GJ\033E)\375\024\241\266\362}(N\217(V\203)\336\207(\326\203(N\334W\322\203\2644\240}\005(\373AJ\031\3202\202\264\374\240\275\003(kA\3129\320R\201\2644\240\375\264\277\322\200\332\005(3\240r\207Z\003(\027\240\362\201Z\003(\363\240R\203\332\005(\017\241\322\201\332\r(\023\2402\237Z\007(7\2402\201Z\037(\023\240\322\200\332\005(\377\240R\200\332\r "),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar3"},
|
||||
},
|
||||
Chunks: []prompb.Chunk{
|
||||
{
|
||||
Type: prompb.Chunk_XOR,
|
||||
MinTimeMs: 14400000,
|
||||
MaxTimeMs: 14400000,
|
||||
Data: []byte("\000\001\200\350\335\r@\327p\000\000\000\000\000\000"),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in a new issue