Expose ChunkSeriesSet and lookups methods.

This commit is contained in:
Fabian Reinartz 2017-11-13 13:57:10 +01:00
parent 3ef4326114
commit f1512a368a
4 changed files with 38 additions and 47 deletions

View file

@ -283,8 +283,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return ErrClosing return ErrClosing
} }
pr := newPostingsReader(pb.indexr) p, absent, err := PostingsForMatchers(pb.indexr, ms...)
p, absent, err := pr.Select(ms...)
if err != nil { if err != nil {
return errors.Wrap(err, "select series") return errors.Wrap(err, "select series")
} }

View file

@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// of the provided blocks. It returns meta information for the new block. // of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
var ( var (
set compactionSet set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16) allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{} closers = []io.Closer{}
) )
@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil return nil
} }
type compactionSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, Intervals)
Err() error
}
type compactionSeriesSet struct { type compactionSeriesSet struct {
p Postings p Postings
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
tombstones TombstoneReader tombstones TombstoneReader
series SeriesSet
l labels.Labels l labels.Labels
c []ChunkMeta c []ChunkMeta
@ -679,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
} }
type compactionMerger struct { type compactionMerger struct {
a, b compactionSet a, b ChunkSeriesSet
aok, bok bool aok, bok bool
l labels.Labels l labels.Labels
@ -692,7 +685,7 @@ type compactionSeries struct {
chunks []*ChunkMeta chunks []*ChunkMeta
} }
func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{ c := &compactionMerger{
a: a, a: a,
b: b, b: b,

View file

@ -574,8 +574,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := h.indexRange(mint, maxt) ir := h.indexRange(mint, maxt)
pr := newPostingsReader(ir) p, absent, err := PostingsForMatchers(ir, ms...)
p, absent, err := pr.Select(ms...)
if err != nil { if err != nil {
return errors.Wrap(err, "select series") return errors.Wrap(err, "select series")
} }

View file

@ -151,22 +151,13 @@ type blockQuerier struct {
} }
func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) {
pr := newPostingsReader(q.index) base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
p, absent, err := pr.Select(ms...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &blockSeriesSet{ return &blockSeriesSet{
set: &populatedChunkSeries{ set: &populatedChunkSeries{
set: &baseChunkSeries{ set: base,
p: p,
index: q.index,
absent: absent,
tombstones: q.tombstones,
},
chunks: q.chunks, chunks: q.chunks,
mint: q.mint, mint: q.mint,
maxt: q.maxt, maxt: q.maxt,
@ -208,16 +199,10 @@ func (q *blockQuerier) Close() error {
return merr.Err() return merr.Err()
} }
// postingsReader is used to select matching postings from an IndexReader. // PostingsForMatchers assembles a single postings iterator against the index reader
type postingsReader struct { // based on the given matchers. It returns a list of label names that must be manually
index IndexReader // checked to not exist in series the postings list points to.
} func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) {
func newPostingsReader(i IndexReader) *postingsReader {
return &postingsReader{index: i}
}
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) {
var ( var (
its []Postings its []Postings
absent []string absent []string
@ -229,16 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error
absent = append(absent, m.Name()) absent = append(absent, m.Name())
continue continue
} }
it, err := r.selectSingle(m) it, err := postingsForMatcher(index, m)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
its = append(its, it) its = append(its, it)
} }
return index.SortedPostings(Intersect(its...)), absent, nil
p := Intersect(its...)
return r.index.SortedPostings(p), absent, nil
} }
// tuplesByPrefix uses binary search to find prefix matches within ts. // tuplesByPrefix uses binary search to find prefix matches within ts.
@ -272,17 +254,17 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
return matches, nil return matches, nil
} }
func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
// Fast-path for equal matching. // Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok { if em, ok := m.(*labels.EqualMatcher); ok {
it, err := r.index.Postings(em.Name(), em.Value()) it, err := index.Postings(em.Name(), em.Value())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return it, nil return it, nil
} }
tpls, err := r.index.LabelValues(m.Name()) tpls, err := index.LabelValues(m.Name())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -313,7 +295,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) {
var rit []Postings var rit []Postings
for _, v := range res { for _, v := range res {
it, err := r.index.Postings(m.Name(), v) it, err := index.Postings(m.Name(), v)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -435,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool {
return true return true
} }
type chunkSeriesSet interface { type ChunkSeriesSet interface {
Next() bool Next() bool
At() (labels.Labels, []ChunkMeta, Intervals) At() (labels.Labels, []ChunkMeta, Intervals)
Err() error Err() error
@ -455,6 +437,24 @@ type baseChunkSeries struct {
err error err error
} }
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
if tr == nil {
tr = EmptyTombstoneReader()
}
p, absent, err := PostingsForMatchers(ir, ms...)
if err != nil {
return nil, err
}
return &baseChunkSeries{
p: p,
index: ir,
tombstones: tr,
absent: absent,
}, nil
}
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
return s.lset, s.chks, s.intervals return s.lset, s.chks, s.intervals
} }
@ -518,7 +518,7 @@ Outer:
// with known chunk references. It filters out chunks that do not fit the // with known chunk references. It filters out chunks that do not fit the
// given time range. // given time range.
type populatedChunkSeries struct { type populatedChunkSeries struct {
set chunkSeriesSet set ChunkSeriesSet
chunks ChunkReader chunks ChunkReader
mint, maxt int64 mint, maxt int64
@ -575,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool {
// blockSeriesSet is a set of series from an inverted index query. // blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct { type blockSeriesSet struct {
set chunkSeriesSet set ChunkSeriesSet
err error err error
cur Series cur Series