diff --git a/db.go b/db.go index 103544d390..78aa7ad84e 100644 --- a/db.go +++ b/db.go @@ -362,13 +362,6 @@ func (cd *chunkDesc) append(ts int64, v float64) (err error) { return nil } -// LabelRefs contains a reference to a label set that can be resolved -// against a Querier. -type LabelRefs struct { - block uint64 - offsets []uint32 -} - // Label is a key/value pair of strings. type Label struct { Name, Value string diff --git a/querier.go b/querier.go index 59b77095f0..ab8d95458b 100644 --- a/querier.go +++ b/querier.go @@ -2,25 +2,35 @@ package tsdb import ( "fmt" + "strings" "github.com/fabxc/tsdb/chunks" ) // Matcher matches a string. type Matcher interface { + Name() string // Match returns true if the matcher applies to the string value. Match(v string) bool } +type equalMatcher struct { + name string + value string +} + +func MatchEquals(n, v string) Matcher { + return &equalMatcher{name: n, value: v} +} + +func (m *equalMatcher) Name() string { return m.name } +func (m *equalMatcher) Match(v string) bool { return v == m.value } + // Querier provides querying access over time series data of a fixed // time range. type Querier interface { - // Iterator returns an interator over the inverted index that - // matches the key label by the constraints of Matcher. - Iterator(key string, m Matcher) Iterator - - // Series returns series provided in the index iterator. - Series(Iterator) ([]Series, error) + // Select returns a set of series that matches the given label matchers. + Select(...Matcher) SeriesSet // LabelValues returns all potential values for a label name. LabelValues(string) ([]string, error) @@ -32,32 +42,14 @@ type Querier interface { Close() error } -func example(db *DB) error { - var m1, m2, m3, m4 Matcher - - q := db.Querier(0, 1000) - - series, err := q.Series( - Merge( - Intersect(q.Iterator("name", m1), q.Iterator("job", m2)), - Intersect(q.Iterator("name", m3), q.Iterator("job", m4)), - ), - ) - if err != nil { - return err - } - for _, s := range series { - s.Iterator() - } - return nil -} - // Series represents a single time series. type Series interface { // Labels returns the complete set of labels identifying the series. Labels() Labels // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator + + // Ref() uint32 } func inRange(x, mint, maxt int64) bool { @@ -86,18 +78,20 @@ func (db *DB) Querier(mint, maxt int64) Querier { // SeriesSet contains a set of series. type SeriesSet interface { + Next() bool + Series() Series + Err() error } -func (q *querier) Select(key string, m Matcher) SeriesSet { - return nil -} +func (q *querier) Select(ms ...Matcher) SeriesSet { + // We gather the non-overlapping series from every shard and simply + // return their union. + r := &mergedSeriesSet{} -func (q *querier) Iterator(key string, m Matcher) Iterator { - return nil -} - -func (q *querier) Series(Iterator) ([]Series, error) { - return nil, nil + for _, s := range q.shards { + r.sets = append(r.sets, s.Select(ms...)) + } + return r } func (q *querier) LabelValues(string) ([]string, error) { @@ -133,39 +127,139 @@ func (s *SeriesShard) Querier(mint, maxt int64) Querier { return sq } -func (q *shardQuerier) Iterator(name string, m Matcher) Iterator { - // Iterators from different blocks have no time overlap. The reference numbers - // they emit point to series sorted in lexicographic order. - // If actually retrieving an iterator result via the Series method, we can fully - // deduplicate series by simply comparing with the previous label set. - var rit Iterator +type mergedSeriesSet struct { + sets []SeriesSet - for _, s := range q.blocks { - rit = Merge(rit, s.Iterator(name, m)) - } - - return rit + cur int + err error } -func (q *shardQuerier) Series(it Iterator) ([]Series, error) { - // Dedulicate series as we stream through the iterator. See comment - // on the Iterator method. +func (s *mergedSeriesSet) Series() Series { return s.sets[s.cur].Series() } +func (s *mergedSeriesSet) Err() error { return s.sets[s.cur].Err() } - var series []Series - // var prev Labels +func (s *mergedSeriesSet) Next() bool { + // TODO(fabxc): We just emit the sets one after one. They are each + // lexicographically sorted. Should we emit their union sorted too? + if s.sets[s.cur].Next() { + return true + } + s.cur++ - // for it.Next() { - // s, err := q.index.Series(it.Value()) - // if err != nil { - // return nil, err - // } - // series = append(series, s) - // } - // if it.Err() != nil { - // return nil, it.Err() - // } + if s.cur == len(s.sets) { + return false + } + return s.Next() +} - return series, nil +type shardSeriesSet struct { + a, b SeriesSet + + cur Series + as, bs Series // peek ahead of each set +} + +func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet { + s := &shardSeriesSet{a: a, b: b} + // Initialize first elements of both sets as Next() needs + // one element look-ahead. + s.advanceA() + s.advanceB() + + return s +} + +// compareLabels compares the two label sets. +// The result will be 0 if a==b, <0 if a < b, and >0 if a > b. +func compareLabels(a, b Labels) int { + l := len(a) + if len(b) < l { + l = len(b) + } + + for i := 0; i < l; i++ { + if d := strings.Compare(a[i].Name, b[i].Name); d != 0 { + return d + } + } + // If all labels so far were in common, the set with fewer labels comes first. + return len(b) - len(a) +} + +func (s *shardSeriesSet) Series() Series { + return s.cur +} + +func (s *shardSeriesSet) Err() error { + if s.a.Err() != nil { + return s.a.Err() + } + return s.b.Err() +} + +func (s *shardSeriesSet) compare() int { + if s.as == nil { + return 1 + } + if s.bs == nil { + return -1 + } + return compareLabels(s.as.Labels(), s.bs.Labels()) +} + +func (s *shardSeriesSet) advanceA() { + if s.a.Next() { + s.as = s.a.Series() + } else { + s.as = nil + } +} + +func (s *shardSeriesSet) advanceB() { + if s.b.Next() { + s.bs = s.b.Series() + } else { + s.bs = nil + } +} + +func (s *shardSeriesSet) Next() bool { + if s.as == nil && s.bs == nil { + return false + } + + d := s.compare() + // Both sets contain the current series. Chain them into a single one. + if d > 0 { + s.cur = s.bs + s.advanceB() + + } else if d < 0 { + s.cur = s.as + s.advanceA() + + } else { + s.cur = &chainedSeries{series: []Series{s.as, s.bs}} + s.advanceA() + s.advanceB() + } + return true +} + +func (q *shardQuerier) Select(ms ...Matcher) SeriesSet { + // Sets from different blocks have no time overlap. The reference numbers + // they emit point to series sorted in lexicographic order. + // We can fully connect partial series by simply comparing with the previous + // label set. + if len(q.blocks) == 0 { + return nil + } + r := q.blocks[0].Select(ms...) + + for _, s := range q.blocks[1:] { + r = &shardSeriesSet{a: r, b: s.Select(ms...)} + } + + return r } func (q *shardQuerier) LabelValues(string) ([]string, error) { @@ -182,10 +276,10 @@ func (q *shardQuerier) Close() error { // blockQuerier provides querying access to a single block database. type blockQuerier struct { - mint, maxt int64 - index IndexReader series SeriesReader + + mint, maxt int64 } func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier { @@ -197,8 +291,22 @@ func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQue } } -func (q *blockQuerier) Iterator(name string, m Matcher) Iterator { - tpls, err := q.index.LabelValues(name) +func (q *blockQuerier) Select(ms ...Matcher) SeriesSet { + var its []Iterator + for _, m := range ms { + its = append(its, q.selectSingle(m)) + } + + // TODO(fabxc): pass down time range so the series iterator + // can be instantiated with it? + return &blockSeriesSet{ + index: q.index, + it: Intersect(its...), + } +} + +func (q *blockQuerier) selectSingle(m Matcher) Iterator { + tpls, err := q.index.LabelValues(m.Name()) if err != nil { return errIterator{err: err} } @@ -219,7 +327,7 @@ func (q *blockQuerier) Iterator(name string, m Matcher) Iterator { var rit Iterator for _, v := range res { - it, err := q.index.Postings(name, v) + it, err := q.index.Postings(m.Name(), v) if err != nil { return errIterator{err: err} } @@ -229,23 +337,6 @@ func (q *blockQuerier) Iterator(name string, m Matcher) Iterator { return rit } -func (q *blockQuerier) Series(it Iterator) ([]Series, error) { - var series []Series - - for it.Next() { - s, err := q.index.Series(it.Value()) - if err != nil { - return nil, err - } - series = append(series, s) - } - if it.Err() != nil { - return nil, it.Err() - } - - return series, nil -} - func (q *blockQuerier) LabelValues(name string) ([]string, error) { tpls, err := q.index.LabelValues(name) if err != nil { @@ -271,6 +362,36 @@ func (q *blockQuerier) Close() error { return nil } +// blockSeriesSet is a set of series from an inverted index query. +type blockSeriesSet struct { + index IndexReader + it Iterator + + err error + cur Series +} + +func (s *blockSeriesSet) Next() bool { + // Get next reference from postings iterator. + if !s.it.Next() { + s.err = s.it.Err() + return false + } + + // Resolve reference to series. + series, err := s.index.Series(s.it.Value()) + if err != nil { + s.err = err + return false + } + + s.cur = series + return true +} + +func (s *blockSeriesSet) Series() Series { return s.cur } +func (s *blockSeriesSet) Err() error { return s.err } + // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. @@ -285,6 +406,46 @@ type SeriesIterator interface { Err() error } +type chainedSeries struct { + series []Series +} + +func (s *chainedSeries) Labels() Labels { + return s.series[0].Labels() +} + +func (s *chainedSeries) Iterator() SeriesIterator { + it := &chainedSeriesIterator{ + series: make([]SeriesIterator, 0, len(s.series)), + } + for _, series := range s.series { + it.series = append(it.series, series.Iterator()) + } + return it +} + +// chainedSeriesIterator implements a series iterater over a list +// of time-sorted, non-overlapping chunks. +type chainedSeriesIterator struct { + series []SeriesIterator +} + +func (it *chainedSeriesIterator) Seek(t int64) bool { + return false +} + +func (it *chainedSeriesIterator) Values() (t int64, v float64) { + return 0, 0 +} + +func (it *chainedSeriesIterator) Next() bool { + return false +} + +func (it *chainedSeriesIterator) Err() error { + return nil +} + // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct {