diff --git a/block.go b/block.go index 034f336730..f2fcf97ff2 100644 --- a/block.go +++ b/block.go @@ -285,7 +285,10 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { } pr := newPostingsReader(pb.indexr) - p, absent := pr.Select(ms...) + p, absent, err := pr.Select(ms...) + if err != nil { + return errors.Wrap(err, "select series") + } ir := pb.indexr diff --git a/db_test.go b/db_test.go index 9f91705e7e..8eb676af8f 100644 --- a/db_test.go +++ b/db_test.go @@ -36,8 +36,11 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { return db, func() { os.RemoveAll(tmpdir) } } -// Convert a SeriesSet into a form useable with reflect.DeepEqual. -func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { +// query runs a matcher query against the querier and fully expands its data. +func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample { + ss, err := q.Select(matchers...) + Ok(t, err) + result := map[string][]sample{} for ss.Next() { @@ -49,12 +52,12 @@ func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { t, v := it.At() samples = append(samples, sample{t: t, v: v}) } - require.NoError(t, it.Err()) + Ok(t, it.Err()) name := series.Labels().String() result[name] = samples } - require.NoError(t, ss.Err()) + Ok(t, ss.Err()) return result } @@ -70,7 +73,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { querier, err := db.Querier(0, 1) require.NoError(t, err) - seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{}) require.NoError(t, querier.Close()) @@ -82,7 +85,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.NoError(t, err) defer querier.Close() - seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) } @@ -102,7 +105,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { require.NoError(t, err) defer querier.Close() - seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{}) } @@ -146,7 +149,7 @@ func TestDBAppenderAddRef(t *testing.T) { q, err := db.Querier(0, 200) require.NoError(t, err) - res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) + res := query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.FromStrings("a", "b").String(): []sample{ @@ -198,7 +201,8 @@ Outer: q, err := db.Querier(0, numSamples) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -294,8 +298,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q, err := db.Querier(0, 10) require.NoError(t, err) - ss := q.Select(labels.NewEqualMatcher("a", "b")) - ssMap := readSeriesSet(t, ss) + ssMap := query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, @@ -314,8 +317,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q, err = db.Querier(0, 10) require.NoError(t, err) - ss = q.Select(labels.NewEqualMatcher("a", "b")) - ssMap = readSeriesSet(t, ss) + ssMap = query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, @@ -352,7 +354,9 @@ func TestDB_Snapshot(t *testing.T) { defer querier.Close() // sum values - seriesSet := querier.Select(labels.NewEqualMatcher("foo", "bar")) + seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + require.NoError(t, err) + sum := 0.0 for seriesSet.Next() { series := seriesSet.At().Iterator() @@ -500,7 +504,8 @@ func TestDB_e2e(t *testing.T) { q, err := db.Querier(mint, maxt) require.NoError(t, err) - ss := q.Select(qry.ms...) + ss, err := q.Select(qry.ms...) + require.NoError(t, err) result := map[string][]sample{} diff --git a/head.go b/head.go index bd22148d16..51a7896a2c 100644 --- a/head.go +++ b/head.go @@ -575,7 +575,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) pr := newPostingsReader(ir) - p, absent := pr.Select(ms...) + p, absent, err := pr.Select(ms...) + if err != nil { + return errors.Wrap(err, "select series") + } var stones []Stone diff --git a/head_test.go b/head_test.go index a265d5b343..277519ad43 100644 --- a/head_test.go +++ b/head_test.go @@ -328,7 +328,8 @@ Outer: // Compare the result. q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { diff --git a/postings.go b/postings.go index 2647f4dd8e..63fb1e31a0 100644 --- a/postings.go +++ b/postings.go @@ -165,6 +165,11 @@ func (e errPostings) Err() error { return e.err } var emptyPostings = errPostings{} +// EmptyPostings returns a postings list that's always empty. +func EmptyPostings() Postings { + return emptyPostings +} + // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { diff --git a/querier.go b/querier.go index ed8b64ceac..89a9547952 100644 --- a/querier.go +++ b/querier.go @@ -27,7 +27,7 @@ import ( // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...labels.Matcher) SeriesSet + Select(...labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(string) ([]string, error) @@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } -func (q *querier) Select(ms ...labels.Matcher) SeriesSet { +func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) } -func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { +func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { if len(qs) == 0 { - return nopSeriesSet{} + return EmptySeriesSet(), nil } if len(qs) == 1 { return qs[0].Select(ms...) } l := len(qs) / 2 - return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms)) + + a, err := q.sel(qs[:l], ms) + if err != nil { + return nil, err + } + b, err := q.sel(qs[l:], ms) + if err != nil { + return nil, err + } + return newMergedSeriesSet(a, b), nil } func (q *querier) Close() error { @@ -141,10 +150,13 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { +func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { pr := newPostingsReader(q.index) - p, absent := pr.Select(ms...) + p, absent, err := pr.Select(ms...) + if err != nil { + return nil, err + } return &blockSeriesSet{ set: &populatedChunkSeries{ @@ -162,7 +174,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, - } + }, nil } func (q *blockQuerier) LabelValues(name string) ([]string, error) { @@ -205,7 +217,7 @@ func newPostingsReader(i IndexReader) *postingsReader { return &postingsReader{index: i} } -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { +func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -217,12 +229,16 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { absent = append(absent, m.Name()) continue } - its = append(its, r.selectSingle(m)) + it, err := r.selectSingle(m) + if err != nil { + return nil, nil, err + } + its = append(its, it) } p := Intersect(its...) - return r.index.SortedPostings(p), absent + return r.index.SortedPostings(p), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -256,33 +272,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) Postings { +func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { it, err := r.index.Postings(em.Name(), em.Value()) if err != nil { - return errPostings{err: err} + return nil, err } - return it + return it, nil } tpls, err := r.index.LabelValues(m.Name()) if err != nil { - return errPostings{err: err} + return nil, err } var res []string if pm, ok := m.(*labels.PrefixMatcher); ok { res, err = tuplesByPrefix(pm, tpls) if err != nil { - return errPostings{err: err} + return nil, err } } else { for i := 0; i < tpls.Len(); i++ { vals, err := tpls.At(i) if err != nil { - return errPostings{err: err} + return nil, err } if m.Matches(vals[0]) { res = append(res, vals[0]) @@ -291,7 +307,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { } if len(res) == 0 { - return emptyPostings + return EmptyPostings(), nil } var rit []Postings @@ -299,12 +315,12 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { for _, v := range res { it, err := r.index.Postings(m.Name(), v) if err != nil { - return errPostings{err: err} + return nil, err } rit = append(rit, it) } - return Merge(rit...) + return Merge(rit...), nil } func mergeStrings(a, b []string) []string { @@ -342,11 +358,12 @@ type SeriesSet interface { Err() error } -type nopSeriesSet struct{} +var emptySeriesSet = errSeriesSet{} -func (nopSeriesSet) Next() bool { return false } -func (nopSeriesSet) At() Series { return nil } -func (nopSeriesSet) Err() error { return nil } +// EmptySeriesSet returns a series set that's always empty. +func EmptySeriesSet() SeriesSet { + return emptySeriesSet +} // mergedSeriesSet takes two series sets as a single series set. The input series sets // must be sorted and sequential in time, i.e. if they have the same label set, diff --git a/querier_test.go b/querier_test.go index b6f95bf7ce..e302e29838 100644 --- a/querier_test.go +++ b/querier_test.go @@ -460,7 +460,8 @@ Outer: maxt: c.maxt, } - res := querier.Select(c.ms...) + res, err := querier.Select(c.ms...) + require.NoError(t, err) for { eok, rok := c.exp.Next(), res.Next() @@ -632,7 +633,8 @@ Outer: maxt: c.maxt, } - res := querier.Select(c.ms...) + res, err := querier.Select(c.ms...) + require.NoError(t, err) for { eok, rok := c.exp.Next(), res.Next() @@ -1228,7 +1230,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) { sel = func(sets []SeriesSet) SeriesSet { if len(sets) == 0 { - return nopSeriesSet{} + return EmptySeriesSet() } if len(sets) == 1 { return sets[0]