Add matching of empty label

This commit is contained in:
Fabian Reinartz 2016-12-30 19:36:28 +01:00
parent eb4f366988
commit a009247ab7
3 changed files with 40 additions and 13 deletions

2
db.go
View file

@ -251,7 +251,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
err := s.head.appendBatch(samples) err := s.head.appendBatch(samples)
// TODO(fabxc): randomize over time and use better scoring function. // TODO(fabxc): randomize over time and use better scoring function.
if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 24000 {
select { select {
case s.persistCh <- struct{}{}: case s.persistCh <- struct{}{}:
go func() { go func() {

View file

@ -220,16 +220,26 @@ func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQue
} }
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
var its []Postings var (
its []Postings
absent []string
)
for _, m := range ms { for _, m := range ms {
// If the matcher checks absence of a label, don't select them
// but propagate the check into the series set.
if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") {
absent = append(absent, m.Name())
continue
}
its = append(its, q.selectSingle(m)) its = append(its, q.selectSingle(m))
} }
return &blockSeriesSet{ return &blockSeriesSet{
index: q.index, index: q.index,
it: Intersect(its...), it: Intersect(its...),
mint: q.mint, absent: absent,
maxt: q.maxt, mint: q.mint,
maxt: q.maxt,
} }
} }
@ -412,8 +422,9 @@ func (s *shardSeriesSet) Next() bool {
// blockSeriesSet is a set of series from an inverted index query. // blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct { type blockSeriesSet struct {
index IndexReader index IndexReader
it Postings it Postings // postings list referencing series
mint, maxt int64 absent []string // labels that must not be set for result series
mint, maxt int64 // considered time range
err error err error
cur Series cur Series
@ -421,18 +432,27 @@ type blockSeriesSet struct {
func (s *blockSeriesSet) Next() bool { func (s *blockSeriesSet) Next() bool {
// Step through the postings iterator to find potential series. // Step through the postings iterator to find potential series.
// Resolving series may return nil if no applicable data for the outer:
// time range exists and we can skip to the next series.
for s.it.Next() { for s.it.Next() {
series, err := s.index.Series(s.it.Value(), s.mint, s.maxt) series, err := s.index.Series(s.it.Value(), s.mint, s.maxt)
if err != nil { if err != nil {
s.err = err s.err = err
return false return false
} }
if series != nil { // Resolving series may return nil if no applicable data for the
s.cur = series // time range exists and we can skip to the next series.
return true if series == nil {
continue
} }
// If a series contains a label that must be absent, it is skipped as well.
for _, abs := range s.absent {
if series.Labels().Get(abs) != "" {
continue outer
}
}
s.cur = series
return true
} }
if s.it.Err() != nil { if s.it.Err() != nil {
s.err = s.it.Err() s.err = s.it.Err()

7
wal.go
View file

@ -188,7 +188,14 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error {
e.buf = append(e.buf, b[:n]...) e.buf = append(e.buf, b[:n]...)
for _, l := range lset { for _, l := range lset {
// func() {
// defer func() {
// if recover() != nil {
// fmt.Println(l)
// }
// }()
n = binary.PutUvarint(b, uint64(len(l.Name))) n = binary.PutUvarint(b, uint64(len(l.Name)))
// }()
e.buf = append(e.buf, b[:n]...) e.buf = append(e.buf, b[:n]...)
e.buf = append(e.buf, l.Name...) e.buf = append(e.buf, l.Name...)