From eb523a6b29469d2753f56e9a92611a19161d1bc2 Mon Sep 17 00:00:00 2001 From: machine424 Date: Wed, 25 Sep 2024 20:02:52 +0200 Subject: [PATCH 1/2] fix(storage/mergeQuerier): add a reproducer for data race that occurs when one of the queriers alters the passed matchers and propose a fix Signed-off-by: machine424 --- storage/merge.go | 8 ++++++++ tsdb/querier_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/storage/merge.go b/storage/merge.go index 2424b26ab7..b6980fb2f9 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -153,13 +153,21 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints ) // Schedule all Selects for all queriers we know about. for _, querier := range q.queriers { + // copy the matchers as some queriers may alter the slice. + // See https://github.com/prometheus/prometheus/issues/14723 + // matchersCopy := make([]*labels.Matcher, len(matchers)) + // copy(matchersCopy, matchers) + wg.Add(1) go func(qr genericQuerier) { + // go func(qr genericQuerier, m []*labels.Matcher) { defer wg.Done() // We need to sort for NewMergeSeriesSet to work. + // seriesSetChan <- qr.Select(ctx, true, hints, m...) seriesSetChan <- qr.Select(ctx, true, hints, matchers...) }(querier) + // }(querier, matchersCopy) } go func() { wg.Wait() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 77772937a7..c52d6fed9e 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3787,3 +3787,29 @@ func (m mockReaderOfLabels) Series(storage.SeriesRef, *labels.ScratchBuilder, *[ func (m mockReaderOfLabels) Symbols() index.StringIter { panic("Series called") } + +// TestMergeQuerierConcurrentSelectMatchers reproduces the data race bug from +// https://github.com/prometheus/prometheus/issues/14723, when one of the queriers (blockQuerier in this case) +// alters the passed matchers. +func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) { + block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil) + require.NoError(t, err) + p, err := NewBlockQuerier(block, 0, 1) + require.NoError(t, err) + + // A secondary querier is required to enable concurrent select; a blockQuerier is used for simplicity. + s, err := NewBlockQuerier(block, 0, 1) + require.NoError(t, err) + + originalMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "baz", ".*"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + } + matchers := append([]*labels.Matcher{}, originalMatchers...) + + mergedQuerier := storage.NewMergeQuerier([]storage.Querier{p}, []storage.Querier{s}, storage.ChainedSeriesMerge) + defer mergedQuerier.Close() + mergedQuerier.Select(context.Background(), false, nil, matchers...) + + require.Equal(t, originalMatchers, matchers) +} From cebcdce78a7412c8821e9b1e794f0c2b5e714043 Mon Sep 17 00:00:00 2001 From: machine424 Date: Fri, 27 Sep 2024 16:03:50 +0200 Subject: [PATCH 2/2] fix(storage/mergeQuerier): copy the matcjers slice before passing it to queriers as some of them may alter it. Signed-off-by: machine424 --- storage/merge.go | 13 +++++-------- tsdb/querier_test.go | 8 +++++++- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index b6980fb2f9..a4d0934b16 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -155,19 +155,16 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints for _, querier := range q.queriers { // copy the matchers as some queriers may alter the slice. // See https://github.com/prometheus/prometheus/issues/14723 - // matchersCopy := make([]*labels.Matcher, len(matchers)) - // copy(matchersCopy, matchers) + matchersCopy := make([]*labels.Matcher, len(matchers)) + copy(matchersCopy, matchers) wg.Add(1) - go func(qr genericQuerier) { - // go func(qr genericQuerier, m []*labels.Matcher) { + go func(qr genericQuerier, m []*labels.Matcher) { defer wg.Done() // We need to sort for NewMergeSeriesSet to work. - // seriesSetChan <- qr.Select(ctx, true, hints, m...) - seriesSetChan <- qr.Select(ctx, true, hints, matchers...) - }(querier) - // }(querier, matchersCopy) + seriesSetChan <- qr.Select(ctx, true, hints, m...) + }(querier, matchersCopy) } go func() { wg.Wait() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index c52d6fed9e..aca6c845b1 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3794,6 +3794,9 @@ func (m mockReaderOfLabels) Symbols() index.StringIter { func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) { block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil) require.NoError(t, err) + defer func() { + require.NoError(t, block.Close()) + }() p, err := NewBlockQuerier(block, 0, 1) require.NoError(t, err) @@ -3808,7 +3811,10 @@ func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) { matchers := append([]*labels.Matcher{}, originalMatchers...) mergedQuerier := storage.NewMergeQuerier([]storage.Querier{p}, []storage.Querier{s}, storage.ChainedSeriesMerge) - defer mergedQuerier.Close() + defer func() { + require.NoError(t, mergedQuerier.Close()) + }() + mergedQuerier.Select(context.Background(), false, nil, matchers...) require.Equal(t, originalMatchers, matchers)