mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
storage/remote: add test showing bug #12605
Add test for bug #12605 for starters. Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
This commit is contained in:
parent
7c75d233d0
commit
76cb4f7fd1
|
@ -18,8 +18,10 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -195,6 +197,103 @@ func BenchmarkStreamReadEndpoint(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
type chunkedResponses []*prompb.ChunkedReadResponse
|
||||
|
||||
var _ = (sort.Interface)(&chunkedResponses{})
|
||||
|
||||
func (r chunkedResponses) Len() int {
|
||||
return len(r)
|
||||
}
|
||||
|
||||
func (r chunkedResponses) Less(i, j int) bool {
|
||||
lblsi := map[string]string{}
|
||||
for _, li := range r[i].ChunkedSeries[0].Labels {
|
||||
lblsi[li.Name] = li.Value
|
||||
}
|
||||
|
||||
lblsj := map[string]string{}
|
||||
for _, lj := range r[j].ChunkedSeries[0].Labels {
|
||||
lblsj[lj.Name] = lj.Value
|
||||
}
|
||||
|
||||
return labels.Compare(labels.FromMap(lblsi), labels.FromMap(lblsj)) < 0
|
||||
}
|
||||
|
||||
func (r chunkedResponses) Swap(i, j int) {
|
||||
r[i], r[j] = r[j], r[i]
|
||||
}
|
||||
|
||||
// Test for https://github.com/prometheus/prometheus/issues/12605.
|
||||
func TestRemoteReadSortedResult(t *testing.T) {
|
||||
store := promql.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{a="1", b="2", c="3"} 0+100x119
|
||||
test_metric1{a="1", b="2", c="3", d="4"} 0+100x120
|
||||
test_metric1{a="1", c="5"} 0+100x120
|
||||
test_metric1{a="1", c="6"} 0+100x120
|
||||
`)
|
||||
defer store.Close()
|
||||
|
||||
api := NewReadHandler(nil, nil, store, func() config.Config {
|
||||
return config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
ExternalLabels: labels.FromStrings("b", "1"),
|
||||
},
|
||||
}
|
||||
},
|
||||
1e6, 1, math.MaxInt,
|
||||
)
|
||||
|
||||
// Encode the request.
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, "a", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
query, err := ToQuery(0, 14400001, []*labels.Matcher{matcher}, &storage.SelectHints{
|
||||
Step: 1,
|
||||
Func: "avg",
|
||||
Start: 0,
|
||||
End: 14400001,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &prompb.ReadRequest{
|
||||
Queries: []*prompb.Query{query},
|
||||
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
|
||||
}
|
||||
data, err := proto.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
compressed := snappy.Encode(nil, data)
|
||||
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
||||
require.NoError(t, err)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
api.ServeHTTP(recorder, request)
|
||||
|
||||
require.Equal(t, 2, recorder.Code/100)
|
||||
|
||||
require.Equal(t, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", recorder.Result().Header.Get("Content-Type"))
|
||||
require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding"))
|
||||
|
||||
var results chunkedResponses
|
||||
stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil)
|
||||
for {
|
||||
res := &prompb.ChunkedReadResponse{}
|
||||
err := stream.NextProto(res)
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
results = append(results, res)
|
||||
}
|
||||
|
||||
require.Equal(t, 4, len(results))
|
||||
|
||||
// NOTE(GiedriusS): with https://github.com/prometheus/prometheus/issues/12605 fixed
|
||||
// this should return true.
|
||||
require.Equal(t, false, sort.IsSorted(results))
|
||||
}
|
||||
|
||||
func TestStreamReadEndpoint(t *testing.T) {
|
||||
// First with 120 float samples. We expect 1 frame with 1 chunk.
|
||||
// Second with 121 float samples, We expect 1 frame with 2 chunks.
|
||||
|
|
Loading…
Reference in a new issue