prometheus/tsdb/querier.go
Bartlomiej Plotka 28c5cfaf0d
tsdb: Moved code merge series and iterators to differen files; cleanup. No functional changes just move! (#7714)
I did not want to move those in previous PR to make it easier to review. Now small cleanup time for readability. (:

## Changes

* Merge series goes to `storage/merge.go` leaving `fanout.go` for just fanout code.
* Moved `fanout test` code from weird separate package to storage.
* Unskiped one test: TestFanout_SelectSorted/chunk_querier
* Moved block series set codes responsible for querying blocks to `querier.go` from `compact.go`



Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-08-03 11:32:56 +01:00

820 lines
21 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"math"
"sort"
"strings"
"unicode/utf8"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
var regexMetaCharacterBytes [16]byte
// isRegexMetaCharacter reports whether byte b needs to be escaped.
func isRegexMetaCharacter(b byte) bool {
return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
}
func init() {
for _, b := range []byte(`.+*?()|[]{}^$`) {
regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
}
}
type blockBaseQuerier struct {
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
closed bool
mint, maxt int64
}
func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
indexr, err := b.Index()
if err != nil {
return nil, errors.Wrap(err, "open index reader")
}
chunkr, err := b.Chunks()
if err != nil {
indexr.Close()
return nil, errors.Wrap(err, "open chunk reader")
}
tombsr, err := b.Tombstones()
if err != nil {
indexr.Close()
chunkr.Close()
return nil, errors.Wrap(err, "open tombstone reader")
}
if tombsr == nil {
tombsr = tombstones.NewMemTombstones()
}
return &blockBaseQuerier{
mint: mint,
maxt: maxt,
index: indexr,
chunks: chunkr,
tombstones: tombsr,
}, nil
}
func (q *blockBaseQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
res, err := q.index.SortedLabelValues(name)
return res, nil, err
}
func (q *blockBaseQuerier) LabelNames() ([]string, storage.Warnings, error) {
res, err := q.index.LabelNames()
return res, nil, err
}
func (q *blockBaseQuerier) Close() error {
if q.closed {
return errors.New("block querier already closed")
}
var merr tsdb_errors.MultiError
merr.Add(q.index.Close())
merr.Add(q.chunks.Close())
merr.Add(q.tombstones.Close())
q.closed = true
return merr.Err()
}
type blockQuerier struct {
*blockBaseQuerier
}
// NewBlockQuerier returns a querier against the block reader and requested min and max time range.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
q, err := newBlockBaseQuerier(b, mint, maxt)
if err != nil {
return nil, err
}
return &blockQuerier{blockBaseQuerier: q}, nil
}
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
mint := q.mint
maxt := q.maxt
if hints != nil {
mint = hints.Start
maxt = hints.End
}
p, err := PostingsForMatchers(q.index, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
}
if sortSeries {
p = q.index.SortedPostings(p)
}
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
}
// blockChunkQuerier provides chunk querying access to a single block database.
type blockChunkQuerier struct {
*blockBaseQuerier
}
// NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.
func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := newBlockBaseQuerier(b, mint, maxt)
if err != nil {
return nil, err
}
return &blockChunkQuerier{blockBaseQuerier: q}, nil
}
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
mint := q.mint
maxt := q.maxt
if hints != nil {
mint = hints.Start
maxt = hints.End
}
p, err := PostingsForMatchers(q.index, ms...)
if err != nil {
return storage.ErrChunkSeriesSet(err)
}
if sortSeries {
p = q.index.SortedPostings(p)
}
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
}
func findSetMatches(pattern string) []string {
// Return empty matches if the wrapper from Prometheus is missing.
if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
return nil
}
escaped := false
sets := []*strings.Builder{{}}
for i := 4; i < len(pattern)-2; i++ {
if escaped {
switch {
case isRegexMetaCharacter(pattern[i]):
sets[len(sets)-1].WriteByte(pattern[i])
case pattern[i] == '\\':
sets[len(sets)-1].WriteByte('\\')
default:
return nil
}
escaped = false
} else {
switch {
case isRegexMetaCharacter(pattern[i]):
if pattern[i] == '|' {
sets = append(sets, &strings.Builder{})
} else {
return nil
}
case pattern[i] == '\\':
escaped = true
default:
sets[len(sets)-1].WriteByte(pattern[i])
}
}
}
matches := make([]string, 0, len(sets))
for _, s := range sets {
if s.Len() > 0 {
matches = append(matches, s.String())
}
}
return matches
}
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. The resulting postings are not ordered by series.
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
var its, notIts []index.Postings
// See which label must be non-empty.
// Optimization for case like {l=~".", l!="1"}.
labelMustBeSet := make(map[string]bool, len(ms))
for _, m := range ms {
if !m.Matches("") {
labelMustBeSet[m.Name] = true
}
}
for _, m := range ms {
if labelMustBeSet[m.Name] {
// If this matcher must be non-empty, we can be smarter.
matchesEmpty := m.Matches("")
isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
if isNot && matchesEmpty { // l!="foo"
// If the label can't be empty and is a Not and the inner matcher
// doesn't match empty, then subtract it out at the end.
inverse, err := m.Inverse()
if err != nil {
return nil, err
}
it, err := postingsForMatcher(ix, inverse)
if err != nil {
return nil, err
}
notIts = append(notIts, it)
} else if isNot && !matchesEmpty { // l!=""
// If the label can't be empty and is a Not, but the inner matcher can
// be empty we need to use inversePostingsForMatcher.
inverse, err := m.Inverse()
if err != nil {
return nil, err
}
it, err := inversePostingsForMatcher(ix, inverse)
if err != nil {
return nil, err
}
its = append(its, it)
} else { // l="a"
// Non-Not matcher, use normal postingsForMatcher.
it, err := postingsForMatcher(ix, m)
if err != nil {
return nil, err
}
its = append(its, it)
}
} else { // l=""
// If the matchers for a labelname selects an empty value, it selects all
// the series which don't have the label name set too. See:
// https://github.com/prometheus/prometheus/issues/3575 and
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
it, err := inversePostingsForMatcher(ix, m)
if err != nil {
return nil, err
}
notIts = append(notIts, it)
}
}
// If there's nothing to subtract from, add in everything and remove the notIts later.
if len(its) == 0 && len(notIts) != 0 {
k, v := index.AllPostingsKey()
allPostings, err := ix.Postings(k, v)
if err != nil {
return nil, err
}
its = append(its, allPostings)
}
it := index.Intersect(its...)
for _, n := range notIts {
it = index.Without(it, n)
}
return it, nil
}
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
// This method will not return postings for missing labels.
// Fast-path for equal matching.
if m.Type == labels.MatchEqual {
return ix.Postings(m.Name, m.Value)
}
// Fast-path for set matching.
if m.Type == labels.MatchRegexp {
setMatches := findSetMatches(m.GetRegexString())
if len(setMatches) > 0 {
sort.Strings(setMatches)
return ix.Postings(m.Name, setMatches...)
}
}
vals, err := ix.LabelValues(m.Name)
if err != nil {
return nil, err
}
var res []string
lastVal, isSorted := "", true
for _, val := range vals {
if m.Matches(val) {
res = append(res, val)
if isSorted && val < lastVal {
isSorted = false
}
lastVal = val
}
}
if len(res) == 0 {
return index.EmptyPostings(), nil
}
if !isSorted {
sort.Strings(res)
}
return ix.Postings(m.Name, res...)
}
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
vals, err := ix.LabelValues(m.Name)
if err != nil {
return nil, err
}
var res []string
lastVal, isSorted := "", true
for _, val := range vals {
if !m.Matches(val) {
res = append(res, val)
if isSorted && val < lastVal {
isSorted = false
}
lastVal = val
}
}
if !isSorted {
sort.Strings(res)
}
return ix.Postings(m.Name, res...)
}
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
mint, maxt int64
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
bufChks []chunks.Meta
err error
}
func (b *blockBaseSeriesSet) Next() bool {
var lbls labels.Labels
for b.p.Next() {
if err := b.index.Series(b.p.At(), &lbls, &b.bufChks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
continue
}
b.err = errors.Wrapf(err, "get series %d", b.p.At())
return false
}
if len(b.bufChks) == 0 {
continue
}
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
b.err = errors.Wrap(err, "get tombstones")
return false
}
// NOTE:
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
// * requested time ranges are closed: [req.Start, req.End].
var trimFront, trimBack bool
// Copy chunks as iteratables are reusable.
chks := make([]chunks.Meta, 0, len(b.bufChks))
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
for _, chk := range b.bufChks {
if chk.MaxTime < b.mint {
continue
}
if chk.MinTime > b.maxt {
continue
}
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
chks = append(chks, chk)
}
// If still not entirely deleted, check if trim is needed based on requested time range.
if chk.MinTime < b.mint {
trimFront = true
}
if chk.MaxTime > b.maxt {
trimBack = true
}
}
if len(chks) == 0 {
continue
}
if trimFront {
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
}
if trimBack {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = lbls
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
}
return true
}
return false
}
func (b *blockBaseSeriesSet) Err() error {
if b.err != nil {
return b.err
}
return b.p.Err()
}
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
// that chunks are trimmed based on given tombstones interval if any.
//
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
// is invalid and chunk rewrite is needed, currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to the same, single series.
chks []chunks.Meta
i int
err error
bufIter *deletedIterator
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
}
func newPopulateWithDelGenericSeriesIterator(
chunks ChunkReader,
chks []chunks.Meta,
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
chunks: chunks,
chks: chks,
i: -1,
bufIter: &deletedIterator{},
intervals: intervals,
}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
if p.err != nil || p.i >= len(p.chks)-1 {
return false
}
p.i++
p.currChkMeta = p.chks[p.i]
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
return false
}
p.bufIter.intervals = p.bufIter.intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.intervals = p.bufIter.intervals.Add(interval)
}
}
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block or just fetching chunks from TSDB.
//
// TODO think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
return true
}
// We don't want full chunk or it's potentially still opened, take just part of it.
p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
return true
}
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunkenc.Iterator
}
func (p *populateWithDelSeriesIterator) Next() bool {
if p.curr != nil && p.curr.Next() {
return true
}
for p.next() {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(nil)
}
if p.curr.Next() {
return true
}
}
return false
}
func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
if p.curr != nil && p.curr.Seek(t) {
return true
}
for p.Next() {
if p.curr.Seek(t) {
return true
}
}
return false
}
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
func (p *populateWithDelSeriesIterator) Err() error {
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
return err
}
if p.curr != nil {
return p.curr.Err()
}
return nil
}
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
}
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
p.err = err
return false
}
if !p.currDelIter.Next() {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
return false
}
t, v := p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)
for p.currDelIter.Next() {
t, v = p.currDelIter.At()
app.Append(t, v)
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
return true
}
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.
// Samples from chunks are also trimmed to requested min and max time.
type blockSeriesSet struct {
blockBaseSeriesSet
}
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
return &blockSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockSeriesSet) At() storage.Series {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func() chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
}
}
// blockChunkSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
// Chunks are also trimmed to requested [min and max] (keeping samples with min and max timestamps).
type blockChunkSeriesSet struct {
blockBaseSeriesSet
}
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func() chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
}
}
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
}
type mergedStringIter struct {
a index.StringIter
b index.StringIter
aok, bok bool
cur string
}
func (m *mergedStringIter) Next() bool {
if (!m.aok && !m.bok) || (m.Err() != nil) {
return false
}
if !m.aok {
m.cur = m.b.At()
m.bok = m.b.Next()
} else if !m.bok {
m.cur = m.a.At()
m.aok = m.a.Next()
} else if m.b.At() > m.a.At() {
m.cur = m.a.At()
m.aok = m.a.Next()
} else if m.a.At() > m.b.At() {
m.cur = m.b.At()
m.bok = m.b.Next()
} else { // Equal.
m.cur = m.b.At()
m.aok = m.a.Next()
m.bok = m.b.Next()
}
return true
}
func (m mergedStringIter) At() string { return m.cur }
func (m mergedStringIter) Err() error {
if m.a.Err() != nil {
return m.a.Err()
}
return m.b.Err()
}
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
it chunkenc.Iterator
intervals tombstones.Intervals
}
func (it *deletedIterator) At() (int64, float64) {
return it.it.At()
}
func (it *deletedIterator) Seek(t int64) bool {
if it.it.Err() != nil {
return false
}
if ok := it.it.Seek(t); !ok {
return false
}
// Now double check if the entry falls into a deleted interval.
ts, _ := it.At()
for _, itv := range it.intervals {
if ts < itv.Mint {
return true
}
if ts > itv.Maxt {
it.intervals = it.intervals[1:]
continue
}
// We're in the middle of an interval, we can now call Next().
return it.Next()
}
// The timestamp is greater than all the deleted intervals.
return true
}
func (it *deletedIterator) Next() bool {
Outer:
for it.it.Next() {
ts, _ := it.it.At()
for _, tr := range it.intervals {
if tr.InBounds(ts) {
continue Outer
}
if ts <= tr.Maxt {
return true
}
it.intervals = it.intervals[1:]
}
return true
}
return false
}
func (it *deletedIterator) Err() error { return it.it.Err() }