diff --git a/block.go b/block.go index 4cb3c048f..48f768223 100644 --- a/block.go +++ b/block.go @@ -289,7 +289,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { return ErrClosing } - p, absent, err := PostingsForMatchers(pb.indexr, ms...) + p, err := PostingsForMatchers(pb.indexr, ms...) if err != nil { return errors.Wrap(err, "select series") } @@ -309,12 +309,6 @@ Outer: return err } - for _, abs := range absent { - if lset.Get(abs) != "" { - continue Outer - } - } - for _, chk := range chks { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { // Delete only until the current vlaues and not beyond. diff --git a/db_test.go b/db_test.go index 69e373de6..b40d7ecf1 100644 --- a/db_test.go +++ b/db_test.go @@ -807,3 +807,89 @@ func TestDB_Retention(t *testing.T) { testutil.Equals(t, 1, len(db.blocks)) testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. } + +func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + testutil.Ok(t, err) + defer db.Close() + + labelpairs := []labels.Labels{ + labels.FromStrings("a", "abcd", "b", "abcde"), + labels.FromStrings("labelname", "labelvalue"), + } + + app := db.Appender() + for _, lbls := range labelpairs { + _, err = app.Add(lbls, 0, 1) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + cases := []struct { + selector labels.Selector + series []labels.Labels + }{{ + selector: labels.Selector{ + labels.Not(labels.NewEqualMatcher("lname", "lvalue")), + }, + series: labelpairs, + }, { + selector: labels.Selector{ + labels.NewEqualMatcher("a", "abcd"), + labels.Not(labels.NewEqualMatcher("b", "abcde")), + }, + series: []labels.Labels{}, + }, { + selector: labels.Selector{ + labels.NewEqualMatcher("a", "abcd"), + labels.Not(labels.NewEqualMatcher("b", "abc")), + }, + series: []labels.Labels{labelpairs[0]}, + }, { + selector: labels.Selector{ + labels.Not(labels.NewMustRegexpMatcher("a", "abd.*")), + }, + series: labelpairs, + }, { + selector: labels.Selector{ + labels.Not(labels.NewMustRegexpMatcher("a", "abc.*")), + }, + series: labelpairs[1:], + }, { + selector: labels.Selector{ + labels.Not(labels.NewMustRegexpMatcher("c", "abd.*")), + }, + series: labelpairs, + }, { + selector: labels.Selector{ + labels.Not(labels.NewMustRegexpMatcher("labelname", "labelvalue")), + }, + series: labelpairs[:1], + }} + + q, err := db.Querier(0, 10) + testutil.Ok(t, err) + defer q.Close() + + for _, c := range cases { + ss, err := q.Select(c.selector...) + testutil.Ok(t, err) + + lres, err := expandSeriesSet(ss) + testutil.Ok(t, err) + + testutil.Equals(t, c.series, lres) + } +} + +func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) { + result := []labels.Labels{} + for ss.Next() { + result = append(result, ss.At().Labels()) + } + + return result, ss.Err() +} diff --git a/head.go b/head.go index d149cb1d9..efebc95ac 100644 --- a/head.go +++ b/head.go @@ -574,23 +574,16 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) - p, absent, err := PostingsForMatchers(ir, ms...) + p, err := PostingsForMatchers(ir, ms...) if err != nil { return errors.Wrap(err, "select series") } var stones []Stone -Outer: for p.Next() { series := h.series.getByID(p.At()) - for _, abs := range absent { - if series.lset.Get(abs) != "" { - continue Outer - } - } - // Delete only until the current values and not beyond. t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime()) stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) diff --git a/labels/selector.go b/labels/selector.go index 4f29452cf..7bc452faa 100644 --- a/labels/selector.go +++ b/labels/selector.go @@ -76,6 +76,18 @@ func NewRegexpMatcher(name, pattern string) (Matcher, error) { return ®expMatcher{name: name, re: re}, nil } +// NewRegexpMatcher returns a new matcher verifying that a value matches +// the regular expression pattern. Will panic if the pattern is not a valid +// regular expression. +func NewMustRegexpMatcher(name, pattern string) Matcher { + re, err := regexp.Compile(pattern) + if err != nil { + panic(err) + } + return ®expMatcher{name: name, re: re} + +} + // notMatcher inverts the matching result for a matcher. type notMatcher struct { Matcher diff --git a/postings.go b/postings.go index 1ebc7c576..200917e13 100644 --- a/postings.go +++ b/postings.go @@ -259,7 +259,7 @@ func (it *intersectPostings) Err() error { // Merge returns a new iterator over the union of the input iterators. func Merge(its ...Postings) Postings { if len(its) == 0 { - return nil + return EmptyPostings() } if len(its) == 1 { return its[0] @@ -340,6 +340,80 @@ func (it *mergedPostings) Err() error { return it.b.Err() } +type removedPostings struct { + full, remove Postings + + cur uint64 + + initialized bool + fok, rok bool +} + +func newRemovedPostings(full, remove Postings) *removedPostings { + return &removedPostings{ + full: full, + remove: remove, + } +} + +func (rp *removedPostings) At() uint64 { + return rp.cur +} + +func (rp *removedPostings) Next() bool { + if !rp.initialized { + rp.fok = rp.full.Next() + rp.rok = rp.remove.Next() + rp.initialized = true + } + + if !rp.fok { + return false + } + + if !rp.rok { + rp.cur = rp.full.At() + rp.fok = rp.full.Next() + return true + } + + fcur, rcur := rp.full.At(), rp.remove.At() + if fcur < rcur { + rp.cur = fcur + rp.fok = rp.full.Next() + + return true + } else if rcur < fcur { + // Forward the remove postings to the right position. + rp.rok = rp.remove.Seek(fcur) + } else { + // Skip the current posting. + rp.fok = rp.full.Next() + } + + return rp.Next() +} + +func (rp *removedPostings) Seek(id uint64) bool { + if rp.cur >= id { + return true + } + + rp.fok = rp.full.Seek(id) + rp.rok = rp.remove.Seek(id) + rp.initialized = true + + return rp.Next() +} + +func (rp *removedPostings) Err() error { + if rp.full.Err() != nil { + return rp.full.Err() + } + + return rp.remove.Err() +} + // listPostings implements the Postings interface over a plain list. type listPostings struct { list []uint64 diff --git a/postings_test.go b/postings_test.go index b75f3bcf4..abaf1b054 100644 --- a/postings_test.go +++ b/postings_test.go @@ -301,6 +301,147 @@ func TestMergedPostingsSeek(t *testing.T) { return } +func TestRemovedPostings(t *testing.T) { + var cases = []struct { + a, b []uint64 + res []uint64 + }{ + { + a: nil, + b: nil, + res: []uint64(nil), + }, + { + a: []uint64{1, 2, 3, 4}, + b: nil, + res: []uint64{1, 2, 3, 4}, + }, + { + a: nil, + b: []uint64{1, 2, 3, 4}, + res: []uint64(nil), + }, + { + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, + res: []uint64{1, 2, 3, 4, 5}, + }, + { + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{4, 5, 6, 7, 8}, + res: []uint64{1, 2, 3}, + }, + { + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 10, 11}, + res: []uint64{2, 3, 9}, + }, + { + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + res: []uint64(nil), + }, + } + + for _, c := range cases { + a := newListPostings(c.a) + b := newListPostings(c.b) + + res, err := expandPostings(newRemovedPostings(a, b)) + testutil.Ok(t, err) + testutil.Equals(t, c.res, res) + } + +} + +func TestRemovedPostingsSeek(t *testing.T) { + var cases = []struct { + a, b []uint64 + + seek uint64 + success bool + res []uint64 + }{ + { + a: []uint64{2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, + + seek: 1, + success: true, + res: []uint64{2, 3, 4, 5}, + }, + { + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, + + seek: 2, + success: true, + res: []uint64{2, 3, 4, 5}, + }, + { + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{4, 5, 6, 7, 8}, + + seek: 9, + success: false, + res: nil, + }, + { + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 10, 11}, + + seek: 10, + success: false, + res: nil, + }, + { + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 11}, + + seek: 4, + success: true, + res: []uint64{9, 10}, + }, + { + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 11}, + + seek: 5, + success: true, + res: []uint64{9, 10}, + }, + { + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 11}, + + seek: 10, + success: true, + res: []uint64{10}, + }, + } + + for _, c := range cases { + a := newListPostings(c.a) + b := newListPostings(c.b) + + p := newRemovedPostings(a, b) + + testutil.Equals(t, c.success, p.Seek(c.seek)) + + // After Seek(), At() should be called. + if c.success { + start := p.At() + lst, err := expandPostings(p) + testutil.Ok(t, err) + + lst = append([]uint64{start}, lst...) + testutil.Equals(t, c.res, lst) + } + } + + return +} + func TestBigEndian(t *testing.T) { num := 1000 // mock a list as postings diff --git a/querier.go b/querier.go index 37672c715..a326699b5 100644 --- a/querier.go +++ b/querier.go @@ -202,25 +202,18 @@ func (q *blockQuerier) Close() error { // PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. It returns a list of label names that must be manually // checked to not exist in series the postings list points to. -func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) { +func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, error) { var ( - its []Postings - absent []string + its []Postings ) for _, m := range ms { - // If the matcher checks absence of a label, don't select them - // but propagate the check into the series set. - if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") { - absent = append(absent, m.Name()) - continue - } it, err := postingsForMatcher(index, m) if err != nil { - return nil, nil, err + return nil, err } its = append(its, it) } - return index.SortedPostings(Intersect(its...)), absent, nil + return index.SortedPostings(Intersect(its...)), nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -255,6 +248,13 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) } func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { + // If the matcher selects an empty value, it selects all the series which dont + // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 + // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + if m.Matches("") { + return postingsForUnsetLabelMatcher(index, m) + } + // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { it, err := index.Postings(em.Name(), em.Value()) @@ -305,6 +305,43 @@ func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { return Merge(rit...), nil } +func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings, error) { + tpls, err := index.LabelValues(m.Name()) + if err != nil { + return nil, err + } + + var res []string + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + return nil, err + } + + if !m.Matches(vals[0]) { + res = append(res, vals[0]) + } + } + + var rit []Postings + for _, v := range res { + it, err := index.Postings(m.Name(), v) + if err != nil { + return nil, err + } + + rit = append(rit, it) + } + mrit := Merge(rit...) + + allPostings, err := index.Postings(allPostingsKey.Name, allPostingsKey.Value) + if err != nil { + return nil, err + } + + return newRemovedPostings(allPostings, mrit), nil +} + func mergeStrings(a, b []string) []string { maxl := len(a) if len(b) > len(a) { @@ -417,6 +454,8 @@ func (s *mergedSeriesSet) Next() bool { return true } +// ChunkSeriesSet exposes the chunks and intervals of a series instead of the +// actual series itself. type ChunkSeriesSet interface { Next() bool At() (labels.Labels, []ChunkMeta, Intervals) @@ -429,7 +468,6 @@ type baseChunkSeries struct { p Postings index IndexReader tombstones TombstoneReader - absent []string // labels that must be unset in results. lset labels.Labels chks []ChunkMeta @@ -443,7 +481,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) if tr == nil { tr = EmptyTombstoneReader() } - p, absent, err := PostingsForMatchers(ir, ms...) + p, err := PostingsForMatchers(ir, ms...) if err != nil { return nil, err } @@ -451,7 +489,6 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) p: p, index: ir, tombstones: tr, - absent: absent, }, nil } @@ -467,7 +504,7 @@ func (s *baseChunkSeries) Next() bool { chunks []ChunkMeta err error ) -Outer: + for s.p.Next() { ref := s.p.At() if err := s.index.Series(ref, &lset, &chunks); err != nil { @@ -479,13 +516,6 @@ Outer: return false } - // If a series contains a label that must be absent, it is skipped as well. - for _, abs := range s.absent { - if lset.Get(abs) != "" { - continue Outer - } - } - s.lset = lset s.chks = chunks s.intervals, err = s.tombstones.Get(s.p.At())