Fixed second bug.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-02-10 20:43:50 +00:00
parent aadffd1360
commit fb79f515fc
5 changed files with 26 additions and 9 deletions

View file

@ -221,7 +221,6 @@ func StreamChunkedReadResponses(
if len(chks) == 0 { if len(chks) == 0 {
break break
} }
b, err := proto.Marshal(&prompb.ChunkedReadResponse{ b, err := proto.Marshal(&prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{ ChunkedSeries: []*prompb.ChunkedSeries{
{ {

View file

@ -64,6 +64,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func()
// Do not close the test database by default as it will deadlock on test failures. // Do not close the test database by default as it will deadlock on test failures.
return db, func() { return db, func() {
testutil.Ok(t, db.Close())
testutil.Ok(t, os.RemoveAll(tmpdir)) testutil.Ok(t, os.RemoveAll(tmpdir))
} }
} }

View file

@ -34,10 +34,10 @@ func New(t testutil.T) storage.Storage {
// Tests just load data for a series sequentially. Thus we // Tests just load data for a series sequentially. Thus we
// need a long appendable window. // need a long appendable window.
db, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ opts := tsdb.DefaultOptions()
MinBlockDuration: model.Duration(24 * time.Hour), opts.MinBlockDuration = model.Duration(24 * time.Hour)
MaxBlockDuration: model.Duration(24 * time.Hour), opts.MaxBlockDuration = model.Duration(24 * time.Hour)
}) db, err := tsdb.Open(dir, nil, nil, opts)
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)
} }

View file

@ -1189,11 +1189,19 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
for i, query := range req.Queries { for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
// The streaming API provides sorted series. // The streaming API provides sorted series.
set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) set, ws, err := querier.SelectSorted(selectParams, filteredMatchers...)
if err != nil { if err != nil {
return err return err
} }
if len(ws) > 0 {
msg := ""
for _, w := range ws {
msg += w.Error() + ";"
}
level.Warn(api.logger).Log("remote read warnings", "warnings", msg)
}
return remote.StreamChunkedReadResponses( return remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f), remote.NewChunkedWriter(w, f),
int64(i), int64(i),
@ -1305,7 +1313,6 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
} }
}() }()
return seriesHandleFn(querier, selectParams, filteredMatchers) return seriesHandleFn(querier, selectParams, filteredMatchers)
} }

View file

@ -1713,10 +1713,20 @@ func TestStreamReadEndpoint(t *testing.T) {
matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1")
testutil.Ok(t, err) testutil.Ok(t, err)
query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{
Step: 1,
Func: "avg",
Start: 0,
End: 14400001,
})
testutil.Ok(t, err) testutil.Ok(t, err)
query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{Step: 0, Func: "avg"}) query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{
Step: 1,
Func: "avg",
Start: 0,
End: 14400001,
})
testutil.Ok(t, err) testutil.Ok(t, err)
req := &prompb.ReadRequest{ req := &prompb.ReadRequest{