mirror of
synced 2025-02-21 03:16:00 -08:00
Improve findSetMatches
to support concatenation.
This improves how we find `SetMatches` for regexp. Notably it allows to support concatenation such as `api_(v1|prom)_push` for which the resulting set matches are `api_v1_push` and `api_prom_push`. I had to support characters classes too since the syntax may try to optimize alternates with them. In the end the code is also more robust than the previous implementation relying on the stringyfied version of the regexp. This could be upstreamed later. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
This commit is contained in:
@ -111,10 +111,11 @@ func (m *Matcher) Inverse() (*Matcher, error) {
panic("labels.Matcher.Matches: invalid match type")
// GetRegexString returns the regex string.
func (m *Matcher) GetRegexString() string {
// SetMatches returns a set of equality matchers for the current regex matchers if possible.
// For examples the regexp `a(b|f)` will returns "ab" and "af".
func (m *Matcher) SetMatches() []string {
if m.re == nil {
return ""
return nil
return m.re.GetRegexString()
return m.re.setMatches
@ -20,25 +20,28 @@ import (
type FastRegexMatcher struct {
re *regexp.Regexp
prefix string
suffix string
contains string
re *regexp.Regexp
setMatches []string
prefix string
suffix string
contains string
func NewFastRegexMatcher(v string) (*FastRegexMatcher, error) {
re, err := regexp.Compile("^(?:" + v + ")$")
if err != nil {
return nil, err
parsed, err := syntax.Parse(v, syntax.Perl)
if err != nil {
return nil, err
// Simplify the syntax tree to run faster.
parsed = parsed.Simplify()
re, err := regexp.Compile("^(?:" + parsed.String() + ")$")
if err != nil {
return nil, err
m := &FastRegexMatcher{
re: re,
re: re,
setMatches: findSetMatches(parsed, ""),
if parsed.Op == syntax.OpConcat {
@ -48,7 +51,132 @@ func NewFastRegexMatcher(v string) (*FastRegexMatcher, error) {
return m, nil
// findSetMatches extract equality matches from a regexp.
// Returns nil if we can't replace the regexp by only equality matchers.
func findSetMatches(re *syntax.Regexp, base string) []string {
// Matches are not case sensitive, if we find a case insensitive regexp.
// We have to abort.
if isCaseInsensitive(re) {
return nil
switch re.Op {
case syntax.OpLiteral:
return []string{base + string(re.Rune)}
case syntax.OpEmptyMatch:
if base != "" {
return []string{base}
case syntax.OpAlternate:
found := findSetMatchesFromAlternate(re, base)
if found != nil {
return found
case syntax.OpCapture:
return findSetMatches(re, base)
case syntax.OpConcat:
found := findSetMatchesFromConcat(re, base)
if found != nil {
return found
case syntax.OpCharClass:
if len(base) == 0 {
return nil
if len(re.Rune) == 1 {
return []string{base + string(re.Rune)}
var matches []string
var totalSet int
for i := 0; i < len(re.Rune); i = i + 2 {
totalSet += int(re.Rune[i+1] - re.Rune[i])
if totalSet > 100 {
return nil
for i := 0; i < len(re.Rune); i = i + 2 {
lo, hi := re.Rune[i], re.Rune[i+1]
if hi == lo {
matches = append(matches, base+string(hi))
} else {
for c := lo; c <= hi; c++ {
matches = append(matches, base+string(c))
return matches
return nil
return nil
func findSetMatchesFromConcat(re *syntax.Regexp, base string) []string {
if isCaseInsensitive(re) {
return nil
if len(re.Sub) == 0 {
return nil
for _, sub := range re.Sub {
matches := findSetMatches(re.Sub[0], base)
if matches == nil {
return nil
for i := 1; i < len(re.Sub); i++ {
var newMatches []string
for _, b := range matches {
m := findSetMatches(re.Sub[i], b)
if m == nil {
return nil
newMatches = append(newMatches, m...)
matches = newMatches
return matches
func findSetMatchesFromAlternate(re *syntax.Regexp, base string) []string {
var setMatches []string
for _, sub := range re.Sub {
found := findSetMatches(sub, base)
if found == nil {
return nil
setMatches = append(setMatches, found...)
return setMatches
// clearCapture removes capture operation as they are not used for matching.
func clearCapture(regs ...*syntax.Regexp) {
for _, r := range regs {
if r.Op == syntax.OpCapture {
*r = *r.Sub[0]
// isCaseInsensitive tells if a regexp is case insensitive.
// The flag should be check at each level of the syntax tree.
func isCaseInsensitive(reg *syntax.Regexp) bool {
return (reg.Flags & syntax.FoldCase) != 0
func (m *FastRegexMatcher) MatchString(s string) bool {
if len(m.setMatches) != 0 {
for _, match := range m.setMatches {
if match == s {
return true
return false
if m.prefix != "" && !strings.HasPrefix(s, m.prefix) {
return false
@ -61,8 +189,8 @@ func (m *FastRegexMatcher) MatchString(s string) bool {
return m.re.MatchString(s)
func (m *FastRegexMatcher) GetRegexString() string {
return m.re.String()
func (m *FastRegexMatcher) SetMatches() []string {
return m.setMatches
// optimizeConcatRegex returns literal prefix/suffix text that can be safely
@ -96,3 +96,55 @@ func TestOptimizeConcatRegex(t *testing.T) {
require.Equal(t, c.contains, contains)
// Refer to https://github.com/prometheus/prometheus/issues/2651.
func TestFindSetMatches(t *testing.T) {
for _, c := range []struct {
pattern string
exp []string
// Single value, coming from a `bar=~"foo"` selector.
{"foo", []string{"foo"}},
// Simple sets alternates.
{"foo|bar|zz", []string{"foo", "bar", "zz"}},
// Simple sets alternate and concat (bar|baz is parsed as ba(r|z)).
{"foo|bar|baz", []string{"foo", "bar", "baz"}},
// Simple sets alternate and concat and capture
{"foo|bar|baz|(zz)", []string{"foo", "bar", "baz", "zz"}},
// Simple sets alternate and concat and alternates with empty matches
// parsed as b(ar|(?:)|uzz) where b(?:) means literal b.
{"bar|b|buzz", []string{"bar", "b", "buzz"}},
// Simple sets containing escaped characters.
{"fo\\.o|bar\\?|\\^baz", []string{"fo.o", "bar?", "^baz"}},
// high low charset different => A(B[CD]|EF)|BC[XY]
{"ABC|ABD|AEF|BCX|BCY", []string{"ABC", "ABD", "AEF", "BCX", "BCY"}},
// triple concat
{"api_(v1|prom)_push", []string{"api_v1_push", "api_prom_push"}},
// triple concat with multiple alternates
{"(api|rpc)_(v1|prom)_push", []string{"api_v1_push", "api_prom_push", "rpc_v1_push", "rpc_prom_push"}},
{"(api|rpc)_(v1|prom)_(push|query)", []string{"api_v1_push", "api_v1_query", "api_prom_push", "api_prom_query", "rpc_v1_push", "rpc_v1_query", "rpc_prom_push", "rpc_prom_query"}},
// OpPlus with concat
{"(.+)/(foo|bar)", nil},
// Simple sets containing special characters without escaping.
{"fo.o|bar?|^baz", nil},
// case sensitive wrapper.
{"(?i)foo", nil},
// case sensitive wrapper on alternate.
{"(?i)foo|bar|baz", nil},
// case sensitive wrapper on concat.
{"(api|rpc)_(v1|prom)_((?i)push|query)", nil},
// too high charset combination
{"(api|rpc)_[^0-9]", nil},
} {
c := c
t.Run(c.pattern, func(t *testing.T) {
parsed, err := syntax.Parse(c.pattern, syntax.Perl)
require.NoError(t, err)
matches := findSetMatches(parsed, "")
require.Equal(t, c.exp, matches)
@ -16,7 +16,6 @@ package tsdb
import (
@ -180,48 +179,6 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
func findSetMatches(pattern string) []string {
// Return empty matches if the wrapper from Prometheus is missing.
if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
return nil
escaped := false
sets := []*strings.Builder{{}}
for i := 4; i < len(pattern)-2; i++ {
if escaped {
switch {
case isRegexMetaCharacter(pattern[i]):
case pattern[i] == '\\':
return nil
escaped = false
} else {
switch {
case isRegexMetaCharacter(pattern[i]):
if pattern[i] == '|' {
sets = append(sets, &strings.Builder{})
} else {
return nil
case pattern[i] == '\\':
escaped = true
matches := make([]string, 0, len(sets))
for _, s := range sets {
if s.Len() > 0 {
matches = append(matches, s.String())
return matches
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. The resulting postings are not ordered by series.
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
@ -316,7 +273,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
// Fast-path for set matching.
if m.Type == labels.MatchRegexp {
setMatches := findSetMatches(m.GetRegexString())
setMatches := m.SetMatches()
if len(setMatches) > 0 {
return ix.Postings(m.Name, setMatches...)
@ -612,6 +569,7 @@ func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
@ -881,7 +839,6 @@ Outer:
if ts <= tr.Maxt {
return true
it.Intervals = it.Intervals[1:]
@ -918,7 +918,7 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
// The subset are all equivalent so this does not capture merging of partial or non-overlapping sets well.
// TODO(bwplotka): Merge with storage merged series set benchmark.
func BenchmarkMergedSeriesSet(b *testing.B) {
var sel = func(sets []storage.SeriesSet) storage.SeriesSet {
sel := func(sets []storage.SeriesSet) storage.SeriesSet {
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
@ -1560,69 +1560,6 @@ func BenchmarkSetMatcher(b *testing.B) {
// Refer to https://github.com/prometheus/prometheus/issues/2651.
func TestFindSetMatches(t *testing.T) {
cases := []struct {
pattern string
exp []string
// Single value, coming from a `bar=~"foo"` selector.
pattern: "^(?:foo)$",
exp: []string{
// Simple sets.
pattern: "^(?:foo|bar|baz)$",
exp: []string{
// Simple sets containing escaped characters.
pattern: "^(?:fo\\.o|bar\\?|\\^baz)$",
exp: []string{
// Simple sets containing special characters without escaping.
pattern: "^(?:fo.o|bar?|^baz)$",
exp: nil,
// Missing wrapper.
pattern: "foo|bar|baz",
exp: nil,
for _, c := range cases {
matches := findSetMatches(c.pattern)
if len(c.exp) == 0 {
if len(matches) != 0 {
t.Errorf("Evaluating %s, unexpected result %v", c.pattern, matches)
} else {
if len(matches) != len(c.exp) {
t.Errorf("Evaluating %s, length of result not equal to exp", c.pattern)
} else {
for i := 0; i < len(c.exp); i++ {
if c.exp[i] != matches[i] {
t.Errorf("Evaluating %s, unexpected result %s", c.pattern, matches[i])
func TestPostingsForMatchers(t *testing.T) {
chunkDir, err := ioutil.TempDir("", "chunk_dir")
require.NoError(t, err)
@ -1881,7 +1818,6 @@ func TestPostingsForMatchers(t *testing.T) {
t.Errorf("Evaluating %v, missing results %+v", c.matchers, exp)
// TestClose ensures that calling Close more than once doesn't block and doesn't panic.
@ -2141,7 +2077,12 @@ func TestBlockBaseSeriesSet(t *testing.T) {
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}}...),
chunks: []chunks.Meta{
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
{Ref: 29},
{Ref: 45},
{Ref: 245},
{Ref: 123},
{Ref: 4232},
{Ref: 5344},
{Ref: 121},
ref: 12,
Reference in a new issue