diff --git a/storage/remote/codec.go b/storage/remote/codec.go index c3e5403a5b..8f07eb0d1e 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -221,7 +221,6 @@ func StreamChunkedReadResponses( if len(chks) == 0 { break } - b, err := proto.Marshal(&prompb.ChunkedReadResponse{ ChunkedSeries: []*prompb.ChunkedSeries{ { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index c2d5b450fb..92526ab62b 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -64,6 +64,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func() // Do not close the test database by default as it will deadlock on test failures. return db, func() { + testutil.Ok(t, db.Close()) testutil.Ok(t, os.RemoveAll(tmpdir)) } } diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 9489308b83..fbdf74d2bd 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -34,10 +34,10 @@ func New(t testutil.T) storage.Storage { // Tests just load data for a series sequentially. Thus we // need a long appendable window. - db, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ - MinBlockDuration: model.Duration(24 * time.Hour), - MaxBlockDuration: model.Duration(24 * time.Hour), - }) + opts := tsdb.DefaultOptions() + opts.MinBlockDuration = model.Duration(24 * time.Hour) + opts.MaxBlockDuration = model.Duration(24 * time.Hour) + db, err := tsdb.Open(dir, nil, nil, opts) if err != nil { t.Fatalf("Opening test storage failed: %s", err) } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 8370ec9f3c..e75cb53422 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -1189,11 +1189,19 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { for i, query := range req.Queries { err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { // The streaming API provides sorted series. - set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) + set, ws, err := querier.SelectSorted(selectParams, filteredMatchers...) if err != nil { return err } + if len(ws) > 0 { + msg := "" + for _, w := range ws { + msg += w.Error() + ";" + } + level.Warn(api.logger).Log("remote read warnings", "warnings", msg) + } + return remote.StreamChunkedReadResponses( remote.NewChunkedWriter(w, f), int64(i), @@ -1305,7 +1313,6 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) } }() - return seriesHandleFn(querier, selectParams, filteredMatchers) } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 104db08a9b..660afd92f9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1713,10 +1713,20 @@ func TestStreamReadEndpoint(t *testing.T) { matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") testutil.Ok(t, err) - query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) + query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{ + Step: 1, + Func: "avg", + Start: 0, + End: 14400001, + }) testutil.Ok(t, err) - query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{Step: 0, Func: "avg"}) + query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{ + Step: 1, + Func: "avg", + Start: 0, + End: 14400001, + }) testutil.Ok(t, err) req := &prompb.ReadRequest{