diff --git a/tsdb/querier.go b/tsdb/querier.go index 72b6b51414..4342d60985 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -21,6 +21,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "golang.org/x/exp/slices" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -244,6 +245,41 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, labelMustBeSet[m.Name] = true } } + isSubtractingMatcher := func(m *labels.Matcher) bool { + if !labelMustBeSet[m.Name] { + return true + } + return (m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp) && m.Matches("") + } + someSubtractingMatchers, someIntersectingMatchers := false, false + for _, m := range ms { + if isSubtractingMatcher(m) { + someSubtractingMatchers = true + } else { + someIntersectingMatchers = true + } + } + + if someSubtractingMatchers && !someIntersectingMatchers { + // If there's nothing to subtract from, add in everything and remove the notIts later. + // We prefer to get AllPostings so that the base of subtraction (i.e. allPostings) + // doesn't include series that may be added to the index reader during this function call. + k, v := index.AllPostingsKey() + allPostings, err := ix.Postings(k, v) + if err != nil { + return nil, err + } + its = append(its, allPostings) + } + + // Sort matchers to have the intersecting matchers first. + // This way the base for subtraction is smaller and + // there is no chance that the set we subtract from + // contains postings of series that didn't exist when + // we constructed the set we subtract by. + slices.SortStableFunc(ms, func(i, j *labels.Matcher) bool { + return !isSubtractingMatcher(i) && isSubtractingMatcher(j) + }) for _, m := range ms { switch { @@ -312,16 +348,6 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, } } - // If there's nothing to subtract from, add in everything and remove the notIts later. - if len(its) == 0 && len(notIts) != 0 { - k, v := index.AllPostingsKey() - allPostings, err := ix.Postings(k, v) - if err != nil { - return nil, err - } - its = append(its, allPostings) - } - it := index.Intersect(its...) for _, n := range notIts { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index e9dd3b75f4..8cfd5d1412 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -22,6 +22,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "testing" "time" @@ -2194,6 +2195,71 @@ func TestPostingsForMatchers(t *testing.T) { } } +// TestQuerierIndexQueriesRace tests the index queries with racing appends. +func TestQuerierIndexQueriesRace(t *testing.T) { + const testRepeats = 1000 + + testCases := []struct { + matchers []*labels.Matcher + }{ + { + matchers: []*labels.Matcher{ + // This matcher should involve the AllPostings posting list in calculating the posting lists. + labels.MustNewMatcher(labels.MatchNotEqual, labels.MetricName, "metric"), + }, + }, + { + matchers: []*labels.Matcher{ + // The first matcher should be effectively the same as AllPostings, because all series have m=0 + // If it is evaluated first, then __name__=metric will contain more series than m=0. + labels.MustNewMatcher(labels.MatchNotEqual, "m", "0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric"), + }, + }, + } + + for _, c := range testCases { + c := c + t.Run(fmt.Sprintf("%v", c.matchers), func(t *testing.T) { + db := openTestDB(t, DefaultOptions(), nil) + h := db.Head() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go appendSeries(t, ctx, wg, h) + t.Cleanup(wg.Wait) + t.Cleanup(cancel) + + for i := 0; i < testRepeats; i++ { + q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + values, _, err := q.LabelValues("n", c.matchers...) + require.NoError(t, err) + require.Emptyf(t, values, `label values for label "n" should be empty`) + } + }) + } +} + +func appendSeries(t *testing.T, ctx context.Context, wg *sync.WaitGroup, h *Head) { + defer wg.Done() + + for i := 0; ctx.Err() != nil; i++ { + app := h.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings(labels.MetricName, "metric", "n", strconv.Itoa(i), "m", "0"), 0, 0) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + + // Throttle down the appends to keep the test somewhat nimble. + time.Sleep(time.Millisecond) + } +} + // TestClose ensures that calling Close more than once doesn't block and doesn't panic. func TestClose(t *testing.T) { dir := t.TempDir()