mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Merge pull request #228 from Gouthamve/not-matchers
Select series with label unset for != and !~
This commit is contained in:
commit
239cbae154
8
block.go
8
block.go
|
@ -289,7 +289,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
|||
return ErrClosing
|
||||
}
|
||||
|
||||
p, absent, err := PostingsForMatchers(pb.indexr, ms...)
|
||||
p, err := PostingsForMatchers(pb.indexr, ms...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "select series")
|
||||
}
|
||||
|
@ -309,12 +309,6 @@ Outer:
|
|||
return err
|
||||
}
|
||||
|
||||
for _, abs := range absent {
|
||||
if lset.Get(abs) != "" {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
for _, chk := range chks {
|
||||
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
||||
// Delete only until the current vlaues and not beyond.
|
||||
|
|
86
db_test.go
86
db_test.go
|
@ -807,3 +807,89 @@ func TestDB_Retention(t *testing.T) {
|
|||
testutil.Equals(t, 1, len(db.blocks))
|
||||
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
|
||||
}
|
||||
|
||||
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
||||
tmpdir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
db, err := Open(tmpdir, nil, nil, nil)
|
||||
testutil.Ok(t, err)
|
||||
defer db.Close()
|
||||
|
||||
labelpairs := []labels.Labels{
|
||||
labels.FromStrings("a", "abcd", "b", "abcde"),
|
||||
labels.FromStrings("labelname", "labelvalue"),
|
||||
}
|
||||
|
||||
app := db.Appender()
|
||||
for _, lbls := range labelpairs {
|
||||
_, err = app.Add(lbls, 0, 1)
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
||||
cases := []struct {
|
||||
selector labels.Selector
|
||||
series []labels.Labels
|
||||
}{{
|
||||
selector: labels.Selector{
|
||||
labels.Not(labels.NewEqualMatcher("lname", "lvalue")),
|
||||
},
|
||||
series: labelpairs,
|
||||
}, {
|
||||
selector: labels.Selector{
|
||||
labels.NewEqualMatcher("a", "abcd"),
|
||||
labels.Not(labels.NewEqualMatcher("b", "abcde")),
|
||||
},
|
||||
series: []labels.Labels{},
|
||||
}, {
|
||||
selector: labels.Selector{
|
||||
labels.NewEqualMatcher("a", "abcd"),
|
||||
labels.Not(labels.NewEqualMatcher("b", "abc")),
|
||||
},
|
||||
series: []labels.Labels{labelpairs[0]},
|
||||
}, {
|
||||
selector: labels.Selector{
|
||||
labels.Not(labels.NewMustRegexpMatcher("a", "abd.*")),
|
||||
},
|
||||
series: labelpairs,
|
||||
}, {
|
||||
selector: labels.Selector{
|
||||
labels.Not(labels.NewMustRegexpMatcher("a", "abc.*")),
|
||||
},
|
||||
series: labelpairs[1:],
|
||||
}, {
|
||||
selector: labels.Selector{
|
||||
labels.Not(labels.NewMustRegexpMatcher("c", "abd.*")),
|
||||
},
|
||||
series: labelpairs,
|
||||
}, {
|
||||
selector: labels.Selector{
|
||||
labels.Not(labels.NewMustRegexpMatcher("labelname", "labelvalue")),
|
||||
},
|
||||
series: labelpairs[:1],
|
||||
}}
|
||||
|
||||
q, err := db.Querier(0, 10)
|
||||
testutil.Ok(t, err)
|
||||
defer q.Close()
|
||||
|
||||
for _, c := range cases {
|
||||
ss, err := q.Select(c.selector...)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
lres, err := expandSeriesSet(ss)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Equals(t, c.series, lres)
|
||||
}
|
||||
}
|
||||
|
||||
func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) {
|
||||
result := []labels.Labels{}
|
||||
for ss.Next() {
|
||||
result = append(result, ss.At().Labels())
|
||||
}
|
||||
|
||||
return result, ss.Err()
|
||||
}
|
||||
|
|
9
head.go
9
head.go
|
@ -574,23 +574,16 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
|||
|
||||
ir := h.indexRange(mint, maxt)
|
||||
|
||||
p, absent, err := PostingsForMatchers(ir, ms...)
|
||||
p, err := PostingsForMatchers(ir, ms...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "select series")
|
||||
}
|
||||
|
||||
var stones []Stone
|
||||
|
||||
Outer:
|
||||
for p.Next() {
|
||||
series := h.series.getByID(p.At())
|
||||
|
||||
for _, abs := range absent {
|
||||
if series.lset.Get(abs) != "" {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
// Delete only until the current values and not beyond.
|
||||
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime())
|
||||
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
|
||||
|
|
|
@ -76,6 +76,18 @@ func NewRegexpMatcher(name, pattern string) (Matcher, error) {
|
|||
return ®expMatcher{name: name, re: re}, nil
|
||||
}
|
||||
|
||||
// NewRegexpMatcher returns a new matcher verifying that a value matches
|
||||
// the regular expression pattern. Will panic if the pattern is not a valid
|
||||
// regular expression.
|
||||
func NewMustRegexpMatcher(name, pattern string) Matcher {
|
||||
re, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ®expMatcher{name: name, re: re}
|
||||
|
||||
}
|
||||
|
||||
// notMatcher inverts the matching result for a matcher.
|
||||
type notMatcher struct {
|
||||
Matcher
|
||||
|
|
76
postings.go
76
postings.go
|
@ -259,7 +259,7 @@ func (it *intersectPostings) Err() error {
|
|||
// Merge returns a new iterator over the union of the input iterators.
|
||||
func Merge(its ...Postings) Postings {
|
||||
if len(its) == 0 {
|
||||
return nil
|
||||
return EmptyPostings()
|
||||
}
|
||||
if len(its) == 1 {
|
||||
return its[0]
|
||||
|
@ -340,6 +340,80 @@ func (it *mergedPostings) Err() error {
|
|||
return it.b.Err()
|
||||
}
|
||||
|
||||
type removedPostings struct {
|
||||
full, remove Postings
|
||||
|
||||
cur uint64
|
||||
|
||||
initialized bool
|
||||
fok, rok bool
|
||||
}
|
||||
|
||||
func newRemovedPostings(full, remove Postings) *removedPostings {
|
||||
return &removedPostings{
|
||||
full: full,
|
||||
remove: remove,
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *removedPostings) At() uint64 {
|
||||
return rp.cur
|
||||
}
|
||||
|
||||
func (rp *removedPostings) Next() bool {
|
||||
if !rp.initialized {
|
||||
rp.fok = rp.full.Next()
|
||||
rp.rok = rp.remove.Next()
|
||||
rp.initialized = true
|
||||
}
|
||||
|
||||
if !rp.fok {
|
||||
return false
|
||||
}
|
||||
|
||||
if !rp.rok {
|
||||
rp.cur = rp.full.At()
|
||||
rp.fok = rp.full.Next()
|
||||
return true
|
||||
}
|
||||
|
||||
fcur, rcur := rp.full.At(), rp.remove.At()
|
||||
if fcur < rcur {
|
||||
rp.cur = fcur
|
||||
rp.fok = rp.full.Next()
|
||||
|
||||
return true
|
||||
} else if rcur < fcur {
|
||||
// Forward the remove postings to the right position.
|
||||
rp.rok = rp.remove.Seek(fcur)
|
||||
} else {
|
||||
// Skip the current posting.
|
||||
rp.fok = rp.full.Next()
|
||||
}
|
||||
|
||||
return rp.Next()
|
||||
}
|
||||
|
||||
func (rp *removedPostings) Seek(id uint64) bool {
|
||||
if rp.cur >= id {
|
||||
return true
|
||||
}
|
||||
|
||||
rp.fok = rp.full.Seek(id)
|
||||
rp.rok = rp.remove.Seek(id)
|
||||
rp.initialized = true
|
||||
|
||||
return rp.Next()
|
||||
}
|
||||
|
||||
func (rp *removedPostings) Err() error {
|
||||
if rp.full.Err() != nil {
|
||||
return rp.full.Err()
|
||||
}
|
||||
|
||||
return rp.remove.Err()
|
||||
}
|
||||
|
||||
// listPostings implements the Postings interface over a plain list.
|
||||
type listPostings struct {
|
||||
list []uint64
|
||||
|
|
141
postings_test.go
141
postings_test.go
|
@ -301,6 +301,147 @@ func TestMergedPostingsSeek(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
func TestRemovedPostings(t *testing.T) {
|
||||
var cases = []struct {
|
||||
a, b []uint64
|
||||
res []uint64
|
||||
}{
|
||||
{
|
||||
a: nil,
|
||||
b: nil,
|
||||
res: []uint64(nil),
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4},
|
||||
b: nil,
|
||||
res: []uint64{1, 2, 3, 4},
|
||||
},
|
||||
{
|
||||
a: nil,
|
||||
b: []uint64{1, 2, 3, 4},
|
||||
res: []uint64(nil),
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 5},
|
||||
b: []uint64{6, 7, 8, 9, 10},
|
||||
res: []uint64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 5},
|
||||
b: []uint64{4, 5, 6, 7, 8},
|
||||
res: []uint64{1, 2, 3},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 9, 10},
|
||||
b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
|
||||
res: []uint64{2, 3, 9},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 9, 10},
|
||||
b: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
|
||||
res: []uint64(nil),
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
a := newListPostings(c.a)
|
||||
b := newListPostings(c.b)
|
||||
|
||||
res, err := expandPostings(newRemovedPostings(a, b))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, c.res, res)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRemovedPostingsSeek(t *testing.T) {
|
||||
var cases = []struct {
|
||||
a, b []uint64
|
||||
|
||||
seek uint64
|
||||
success bool
|
||||
res []uint64
|
||||
}{
|
||||
{
|
||||
a: []uint64{2, 3, 4, 5},
|
||||
b: []uint64{6, 7, 8, 9, 10},
|
||||
|
||||
seek: 1,
|
||||
success: true,
|
||||
res: []uint64{2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 5},
|
||||
b: []uint64{6, 7, 8, 9, 10},
|
||||
|
||||
seek: 2,
|
||||
success: true,
|
||||
res: []uint64{2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 5},
|
||||
b: []uint64{4, 5, 6, 7, 8},
|
||||
|
||||
seek: 9,
|
||||
success: false,
|
||||
res: nil,
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 9, 10},
|
||||
b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
|
||||
|
||||
seek: 10,
|
||||
success: false,
|
||||
res: nil,
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 9, 10},
|
||||
b: []uint64{1, 4, 5, 6, 7, 8, 11},
|
||||
|
||||
seek: 4,
|
||||
success: true,
|
||||
res: []uint64{9, 10},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 9, 10},
|
||||
b: []uint64{1, 4, 5, 6, 7, 8, 11},
|
||||
|
||||
seek: 5,
|
||||
success: true,
|
||||
res: []uint64{9, 10},
|
||||
},
|
||||
{
|
||||
a: []uint64{1, 2, 3, 4, 9, 10},
|
||||
b: []uint64{1, 4, 5, 6, 7, 8, 11},
|
||||
|
||||
seek: 10,
|
||||
success: true,
|
||||
res: []uint64{10},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
a := newListPostings(c.a)
|
||||
b := newListPostings(c.b)
|
||||
|
||||
p := newRemovedPostings(a, b)
|
||||
|
||||
testutil.Equals(t, c.success, p.Seek(c.seek))
|
||||
|
||||
// After Seek(), At() should be called.
|
||||
if c.success {
|
||||
start := p.At()
|
||||
lst, err := expandPostings(p)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
lst = append([]uint64{start}, lst...)
|
||||
testutil.Equals(t, c.res, lst)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestBigEndian(t *testing.T) {
|
||||
num := 1000
|
||||
// mock a list as postings
|
||||
|
|
72
querier.go
72
querier.go
|
@ -202,25 +202,18 @@ func (q *blockQuerier) Close() error {
|
|||
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||
// based on the given matchers. It returns a list of label names that must be manually
|
||||
// checked to not exist in series the postings list points to.
|
||||
func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) {
|
||||
func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, error) {
|
||||
var (
|
||||
its []Postings
|
||||
absent []string
|
||||
)
|
||||
for _, m := range ms {
|
||||
// If the matcher checks absence of a label, don't select them
|
||||
// but propagate the check into the series set.
|
||||
if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") {
|
||||
absent = append(absent, m.Name())
|
||||
continue
|
||||
}
|
||||
it, err := postingsForMatcher(index, m)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
its = append(its, it)
|
||||
}
|
||||
return index.SortedPostings(Intersect(its...)), absent, nil
|
||||
return index.SortedPostings(Intersect(its...)), nil
|
||||
}
|
||||
|
||||
// tuplesByPrefix uses binary search to find prefix matches within ts.
|
||||
|
@ -255,6 +248,13 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
|
|||
}
|
||||
|
||||
func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
|
||||
// If the matcher selects an empty value, it selects all the series which dont
|
||||
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
|
||||
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
||||
if m.Matches("") {
|
||||
return postingsForUnsetLabelMatcher(index, m)
|
||||
}
|
||||
|
||||
// Fast-path for equal matching.
|
||||
if em, ok := m.(*labels.EqualMatcher); ok {
|
||||
it, err := index.Postings(em.Name(), em.Value())
|
||||
|
@ -305,6 +305,43 @@ func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
|
|||
return Merge(rit...), nil
|
||||
}
|
||||
|
||||
func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
|
||||
tpls, err := index.LabelValues(m.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res []string
|
||||
for i := 0; i < tpls.Len(); i++ {
|
||||
vals, err := tpls.At(i)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !m.Matches(vals[0]) {
|
||||
res = append(res, vals[0])
|
||||
}
|
||||
}
|
||||
|
||||
var rit []Postings
|
||||
for _, v := range res {
|
||||
it, err := index.Postings(m.Name(), v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rit = append(rit, it)
|
||||
}
|
||||
mrit := Merge(rit...)
|
||||
|
||||
allPostings, err := index.Postings(allPostingsKey.Name, allPostingsKey.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newRemovedPostings(allPostings, mrit), nil
|
||||
}
|
||||
|
||||
func mergeStrings(a, b []string) []string {
|
||||
maxl := len(a)
|
||||
if len(b) > len(a) {
|
||||
|
@ -417,6 +454,8 @@ func (s *mergedSeriesSet) Next() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// ChunkSeriesSet exposes the chunks and intervals of a series instead of the
|
||||
// actual series itself.
|
||||
type ChunkSeriesSet interface {
|
||||
Next() bool
|
||||
At() (labels.Labels, []ChunkMeta, Intervals)
|
||||
|
@ -429,7 +468,6 @@ type baseChunkSeries struct {
|
|||
p Postings
|
||||
index IndexReader
|
||||
tombstones TombstoneReader
|
||||
absent []string // labels that must be unset in results.
|
||||
|
||||
lset labels.Labels
|
||||
chks []ChunkMeta
|
||||
|
@ -443,7 +481,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher)
|
|||
if tr == nil {
|
||||
tr = EmptyTombstoneReader()
|
||||
}
|
||||
p, absent, err := PostingsForMatchers(ir, ms...)
|
||||
p, err := PostingsForMatchers(ir, ms...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -451,7 +489,6 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher)
|
|||
p: p,
|
||||
index: ir,
|
||||
tombstones: tr,
|
||||
absent: absent,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -467,7 +504,7 @@ func (s *baseChunkSeries) Next() bool {
|
|||
chunks []ChunkMeta
|
||||
err error
|
||||
)
|
||||
Outer:
|
||||
|
||||
for s.p.Next() {
|
||||
ref := s.p.At()
|
||||
if err := s.index.Series(ref, &lset, &chunks); err != nil {
|
||||
|
@ -479,13 +516,6 @@ Outer:
|
|||
return false
|
||||
}
|
||||
|
||||
// If a series contains a label that must be absent, it is skipped as well.
|
||||
for _, abs := range s.absent {
|
||||
if lset.Get(abs) != "" {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
s.lset = lset
|
||||
s.chks = chunks
|
||||
s.intervals, err = s.tombstones.Get(s.p.At())
|
||||
|
|
Loading…
Reference in a new issue