prometheus/tsdb/querier.go
Brian Brazil 38d32e0686 Don't sort postings if we only have one block.
Sorting the heads postings can be quite slow.
We only need sorted series when merging with another
querier, so only sort then.
This will make big queries that only touch the head faster,
though queries that touch both the head and a block will still
be the same speed. This probably won't help much with graphing
unless the range is under an hour, however it should make most
recording rules faster.

Add gaurantee that remote read streaming produces sorted series.

PromQL benchmarks for histograms show only 2-3% improvement, but
they're only over 1k series.

benchmark                                                old ns/op      new ns/op      delta
BenchmarkQuerierSelect/Head/1of1000000-4                 1375486282     507657736      -63.09%
BenchmarkQuerierSelect/Head/10of1000000-4                1387859004     507769850      -63.41%
BenchmarkQuerierSelect/Head/100of1000000-4               1387087935     506029110      -63.52%
BenchmarkQuerierSelect/Head/1000of1000000-4              1386869064     504521986      -63.62%
BenchmarkQuerierSelect/Head/10000of1000000-4             1386213685     505210422      -63.55%
BenchmarkQuerierSelect/Head/100000of1000000-4            1392754988     529842406      -61.96%
BenchmarkQuerierSelect/Head/1000000of1000000-4           1569414722     725059506      -53.80%
BenchmarkQuerierSelect/SortedHead/1of1000000-4           1381019902     1370495863     -0.76%
BenchmarkQuerierSelect/SortedHead/10of1000000-4          1375696209     1366789468     -0.65%
BenchmarkQuerierSelect/SortedHead/100of1000000-4         1386009422     1364519297     -1.55%
BenchmarkQuerierSelect/SortedHead/1000of1000000-4        1377700532     1364486191     -0.96%
BenchmarkQuerierSelect/SortedHead/10000of1000000-4       1383539536     1369545314     -1.01%
BenchmarkQuerierSelect/SortedHead/100000of1000000-4      1410089163     1394731339     -1.09%
BenchmarkQuerierSelect/SortedHead/1000000of1000000-4     1634744148     1581554956     -3.25%
BenchmarkQuerierSelect/Block/1of1000000-4                881741242      879839470      -0.22%
BenchmarkQuerierSelect/Block/10of1000000-4               880381562      882846038      +0.28%
BenchmarkQuerierSelect/Block/100of1000000-4              887519357      881016916      -0.73%
BenchmarkQuerierSelect/Block/1000of1000000-4             902194205      883433524      -2.08%
BenchmarkQuerierSelect/Block/10000of1000000-4            892321964      885130170      -0.81%
BenchmarkQuerierSelect/Block/100000of1000000-4           938604466      933527150      -0.54%
BenchmarkQuerierSelect/Block/1000000of1000000-4          1313510845     1295881124     -1.34%

benchmark                                                old allocs     new allocs     delta
BenchmarkQuerierSelect/Head/1of1000000-4                 4000056        4000018        -0.00%
BenchmarkQuerierSelect/Head/10of1000000-4                4000074        4000036        -0.00%
BenchmarkQuerierSelect/Head/100of1000000-4               4000254        4000216        -0.00%
BenchmarkQuerierSelect/Head/1000of1000000-4              4002054        4002016        -0.00%
BenchmarkQuerierSelect/Head/10000of1000000-4             4020054        4020016        -0.00%
BenchmarkQuerierSelect/Head/100000of1000000-4            4200054        4200016        -0.00%
BenchmarkQuerierSelect/Head/1000000of1000000-4           6000054        6000016        -0.00%
BenchmarkQuerierSelect/SortedHead/1of1000000-4           4000071        4000071        +0.00%
BenchmarkQuerierSelect/SortedHead/10of1000000-4          4000089        4000089        +0.00%
BenchmarkQuerierSelect/SortedHead/100of1000000-4         4000269        4000269        +0.00%
BenchmarkQuerierSelect/SortedHead/1000of1000000-4        4002069        4002069        +0.00%
BenchmarkQuerierSelect/SortedHead/10000of1000000-4       4020069        4020069        +0.00%
BenchmarkQuerierSelect/SortedHead/100000of1000000-4      4200069        4200069        +0.00%
BenchmarkQuerierSelect/SortedHead/1000000of1000000-4     6000069        6000069        +0.00%
BenchmarkQuerierSelect/Block/1of1000000-4                6000023        6000022        -0.00%
BenchmarkQuerierSelect/Block/10of1000000-4               6000059        6000058        -0.00%
BenchmarkQuerierSelect/Block/100of1000000-4              6000419        6000418        -0.00%
BenchmarkQuerierSelect/Block/1000of1000000-4             6004019        6004018        -0.00%
BenchmarkQuerierSelect/Block/10000of1000000-4            6040019        6040018        -0.00%
BenchmarkQuerierSelect/Block/100000of1000000-4           6400019        6400018        -0.00%
BenchmarkQuerierSelect/Block/1000000of1000000-4          10000020       10000019       -0.00%

benchmark                                                old bytes     new bytes     delta
BenchmarkQuerierSelect/Head/1of1000000-4                 229192200     176001176     -23.21%
BenchmarkQuerierSelect/Head/10of1000000-4                229193352     176002328     -23.21%
BenchmarkQuerierSelect/Head/100of1000000-4               229204872     176013848     -23.21%
BenchmarkQuerierSelect/Head/1000of1000000-4              229320072     176129048     -23.20%
BenchmarkQuerierSelect/Head/10000of1000000-4             230472072     177281048     -23.08%
BenchmarkQuerierSelect/Head/100000of1000000-4            241992072     188801048     -21.98%
BenchmarkQuerierSelect/Head/1000000of1000000-4           357192072     304001048     -14.89%
BenchmarkQuerierSelect/SortedHead/1of1000000-4           229193928     229193928     +0.00%
BenchmarkQuerierSelect/SortedHead/10of1000000-4          229195080     229195080     +0.00%
BenchmarkQuerierSelect/SortedHead/100of1000000-4         229206600     229206600     +0.00%
BenchmarkQuerierSelect/SortedHead/1000of1000000-4        229321800     229321800     +0.00%
BenchmarkQuerierSelect/SortedHead/10000of1000000-4       230473800     230473800     +0.00%
BenchmarkQuerierSelect/SortedHead/100000of1000000-4      241993800     241993800     +0.00%
BenchmarkQuerierSelect/SortedHead/1000000of1000000-4     357193800     357193800     +0.00%
BenchmarkQuerierSelect/Block/1of1000000-4                227201516     227201500     -0.00%
BenchmarkQuerierSelect/Block/10of1000000-4               227202924     227202908     -0.00%
BenchmarkQuerierSelect/Block/100of1000000-4              227217036     227217020     -0.00%
BenchmarkQuerierSelect/Block/1000of1000000-4             227358156     227358140     -0.00%
BenchmarkQuerierSelect/Block/10000of1000000-4            228769356     228769340     -0.00%
BenchmarkQuerierSelect/Block/100000of1000000-4           242881356     242881340     -0.00%
BenchmarkQuerierSelect/Block/1000000of1000000-4          384001616     384001600     -0.00%

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2020-01-28 09:14:56 +00:00

1257 lines
28 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"sort"
"strings"
"unicode/utf8"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
// Querier provides querying access over time series data of a fixed
// time range.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(...*labels.Matcher) (SeriesSet, error)
// SelectSorted returns a sorted set of series that matches the given label matcher.
SelectSorted(...*labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier.
LabelValues(string) ([]string, error)
// LabelNames returns all the unique label names present in the block in sorted order.
LabelNames() ([]string, error)
// Close releases the resources of the Querier.
Close() error
}
// Series exposes a single time series.
type Series interface {
// Labels returns the complete set of labels identifying the series.
Labels() labels.Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
// querier aggregates querying results from time blocks within
// a single partition.
type querier struct {
blocks []Querier
}
func (q *querier) LabelValues(n string) ([]string, error) {
return q.lvals(q.blocks, n)
}
// LabelNames returns all the unique label names present querier blocks.
func (q *querier) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{})
for _, b := range q.blocks {
names, err := b.LabelNames()
if err != nil {
return nil, errors.Wrap(err, "LabelNames() from Querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
func (q *querier) lvals(qs []Querier, n string) ([]string, error) {
if len(qs) == 0 {
return nil, nil
}
if len(qs) == 1 {
return qs[0].LabelValues(n)
}
l := len(qs) / 2
s1, err := q.lvals(qs[:l], n)
if err != nil {
return nil, err
}
s2, err := q.lvals(qs[l:], n)
if err != nil {
return nil, err
}
return mergeStrings(s1, s2), nil
}
func (q *querier) Select(ms ...*labels.Matcher) (SeriesSet, error) {
if len(q.blocks) != 1 {
return q.SelectSorted(ms...)
}
// Sorting Head series is slow, and unneeded when only the
// Head is being queried. Sorting blocks is a noop.
return q.blocks[0].Select(ms...)
}
func (q *querier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) {
if len(q.blocks) == 0 {
return EmptySeriesSet(), nil
}
ss := make([]SeriesSet, len(q.blocks))
for i, b := range q.blocks {
s, err := b.SelectSorted(ms...)
if err != nil {
return nil, err
}
ss[i] = s
}
return NewMergedSeriesSet(ss), nil
}
func (q *querier) Close() error {
var merr tsdb_errors.MultiError
for _, bq := range q.blocks {
merr.Add(bq.Close())
}
return merr.Err()
}
// verticalQuerier aggregates querying results from time blocks within
// a single partition. The block time ranges can be overlapping.
type verticalQuerier struct {
querier
}
func (q *verticalQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms)
}
func (q *verticalQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms)
}
func (q *verticalQuerier) sel(qs []Querier, ms []*labels.Matcher) (SeriesSet, error) {
if len(qs) == 0 {
return EmptySeriesSet(), nil
}
if len(qs) == 1 {
return qs[0].SelectSorted(ms...)
}
l := len(qs) / 2
a, err := q.sel(qs[:l], ms)
if err != nil {
return nil, err
}
b, err := q.sel(qs[l:], ms)
if err != nil {
return nil, err
}
return newMergedVerticalSeriesSet(a, b), nil
}
// NewBlockQuerier returns a querier against the reader.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
indexr, err := b.Index()
if err != nil {
return nil, errors.Wrapf(err, "open index reader")
}
chunkr, err := b.Chunks()
if err != nil {
indexr.Close()
return nil, errors.Wrapf(err, "open chunk reader")
}
tombsr, err := b.Tombstones()
if err != nil {
indexr.Close()
chunkr.Close()
return nil, errors.Wrapf(err, "open tombstone reader")
}
return &blockQuerier{
mint: mint,
maxt: maxt,
index: indexr,
chunks: chunkr,
tombstones: tombsr,
}, nil
}
// blockQuerier provides querying access to a single block database.
type blockQuerier struct {
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
closed bool
mint, maxt int64
}
func (q *blockQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) {
base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
if err != nil {
return nil, err
}
return &blockSeriesSet{
set: &populatedChunkSeries{
set: base,
chunks: q.chunks,
mint: q.mint,
maxt: q.maxt,
},
mint: q.mint,
maxt: q.maxt,
}, nil
}
func (q *blockQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) {
base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
if err != nil {
return nil, err
}
return &blockSeriesSet{
set: &populatedChunkSeries{
set: base,
chunks: q.chunks,
mint: q.mint,
maxt: q.maxt,
},
mint: q.mint,
maxt: q.maxt,
}, nil
}
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
return q.index.LabelValues(name)
}
func (q *blockQuerier) LabelNames() ([]string, error) {
return q.index.LabelNames()
}
func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *blockQuerier) Close() error {
if q.closed {
return errors.New("block querier already closed")
}
var merr tsdb_errors.MultiError
merr.Add(q.index.Close())
merr.Add(q.chunks.Close())
merr.Add(q.tombstones.Close())
q.closed = true
return merr.Err()
}
// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
var regexMetaCharacterBytes [16]byte
// isRegexMetaCharacter reports whether byte b needs to be escaped.
func isRegexMetaCharacter(b byte) bool {
return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
}
func init() {
for _, b := range []byte(`.+*?()|[]{}^$`) {
regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
}
}
func findSetMatches(pattern string) []string {
// Return empty matches if the wrapper from Prometheus is missing.
if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
return nil
}
escaped := false
sets := []*strings.Builder{{}}
for i := 4; i < len(pattern)-2; i++ {
if escaped {
switch {
case isRegexMetaCharacter(pattern[i]):
sets[len(sets)-1].WriteByte(pattern[i])
case pattern[i] == '\\':
sets[len(sets)-1].WriteByte('\\')
default:
return nil
}
escaped = false
} else {
switch {
case isRegexMetaCharacter(pattern[i]):
if pattern[i] == '|' {
sets = append(sets, &strings.Builder{})
} else {
return nil
}
case pattern[i] == '\\':
escaped = true
default:
sets[len(sets)-1].WriteByte(pattern[i])
}
}
}
matches := make([]string, 0, len(sets))
for _, s := range sets {
if s.Len() > 0 {
matches = append(matches, s.String())
}
}
return matches
}
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. The resulting postings are not ordered by series.
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
var its, notIts []index.Postings
// See which label must be non-empty.
// Optimization for case like {l=~".", l!="1"}.
labelMustBeSet := make(map[string]bool, len(ms))
for _, m := range ms {
if !m.Matches("") {
labelMustBeSet[m.Name] = true
}
}
for _, m := range ms {
if labelMustBeSet[m.Name] {
// If this matcher must be non-empty, we can be smarter.
matchesEmpty := m.Matches("")
isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
if isNot && matchesEmpty { // l!="foo"
// If the label can't be empty and is a Not and the inner matcher
// doesn't match empty, then subtract it out at the end.
inverse, err := m.Inverse()
if err != nil {
return nil, err
}
it, err := postingsForMatcher(ix, inverse)
if err != nil {
return nil, err
}
notIts = append(notIts, it)
} else if isNot && !matchesEmpty { // l!=""
// If the label can't be empty and is a Not, but the inner matcher can
// be empty we need to use inversePostingsForMatcher.
inverse, err := m.Inverse()
if err != nil {
return nil, err
}
it, err := inversePostingsForMatcher(ix, inverse)
if err != nil {
return nil, err
}
its = append(its, it)
} else { // l="a"
// Non-Not matcher, use normal postingsForMatcher.
it, err := postingsForMatcher(ix, m)
if err != nil {
return nil, err
}
its = append(its, it)
}
} else { // l=""
// If the matchers for a labelname selects an empty value, it selects all
// the series which don't have the label name set too. See:
// https://github.com/prometheus/prometheus/issues/3575 and
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
it, err := inversePostingsForMatcher(ix, m)
if err != nil {
return nil, err
}
notIts = append(notIts, it)
}
}
// If there's nothing to subtract from, add in everything and remove the notIts later.
if len(its) == 0 && len(notIts) != 0 {
k, v := index.AllPostingsKey()
allPostings, err := ix.Postings(k, v)
if err != nil {
return nil, err
}
its = append(its, allPostings)
}
it := index.Intersect(its...)
for _, n := range notIts {
it = index.Without(it, n)
}
return it, nil
}
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
// This method will not return postings for missing labels.
// Fast-path for equal matching.
if m.Type == labels.MatchEqual {
return ix.Postings(m.Name, m.Value)
}
// Fast-path for set matching.
if m.Type == labels.MatchRegexp {
setMatches := findSetMatches(m.Value)
if len(setMatches) > 0 {
sort.Strings(setMatches)
return ix.Postings(m.Name, setMatches...)
}
}
vals, err := ix.LabelValues(m.Name)
if err != nil {
return nil, err
}
var res []string
for _, val := range vals {
if m.Matches(val) {
res = append(res, val)
}
}
if len(res) == 0 {
return index.EmptyPostings(), nil
}
return ix.Postings(m.Name, res...)
}
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
vals, err := ix.LabelValues(m.Name)
if err != nil {
return nil, err
}
var res []string
for _, val := range vals {
if !m.Matches(val) {
res = append(res, val)
}
}
return ix.Postings(m.Name, res...)
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
d := strings.Compare(a[0], b[0])
if d == 0 {
res = append(res, a[0])
a, b = a[1:], b[1:]
} else if d < 0 {
res = append(res, a[0])
a = a[1:]
} else if d > 0 {
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
At() Series
Err() error
}
var emptySeriesSet = errSeriesSet{}
// EmptySeriesSet returns a series set that's always empty.
func EmptySeriesSet() SeriesSet {
return emptySeriesSet
}
// mergedSeriesSet returns a series sets slice as a single series set. The input series sets
// must be sorted and sequential in time.
type mergedSeriesSet struct {
all []SeriesSet
buf []SeriesSet // A buffer for keeping the order of SeriesSet slice during forwarding the SeriesSet.
ids []int // The indices of chosen SeriesSet for the current run.
done bool
err error
cur Series
}
func NewMergedSeriesSet(all []SeriesSet) SeriesSet {
if len(all) == 1 {
return all[0]
}
s := &mergedSeriesSet{all: all}
// Initialize first elements of all sets as Next() needs
// one element look-ahead.
s.nextAll()
if len(s.all) == 0 {
s.done = true
}
return s
}
func (s *mergedSeriesSet) At() Series {
return s.cur
}
func (s *mergedSeriesSet) Err() error {
return s.err
}
// nextAll is to call Next() for all SeriesSet.
// Because the order of the SeriesSet slice will affect the results,
// we need to use an buffer slice to hold the order.
func (s *mergedSeriesSet) nextAll() {
s.buf = s.buf[:0]
for _, ss := range s.all {
if ss.Next() {
s.buf = append(s.buf, ss)
} else if ss.Err() != nil {
s.done = true
s.err = ss.Err()
break
}
}
s.all, s.buf = s.buf, s.all
}
// nextWithID is to call Next() for the SeriesSet with the indices of s.ids.
// Because the order of the SeriesSet slice will affect the results,
// we need to use an buffer slice to hold the order.
func (s *mergedSeriesSet) nextWithID() {
if len(s.ids) == 0 {
return
}
s.buf = s.buf[:0]
i1 := 0
i2 := 0
for i1 < len(s.all) {
if i2 < len(s.ids) && i1 == s.ids[i2] {
if !s.all[s.ids[i2]].Next() {
if s.all[s.ids[i2]].Err() != nil {
s.done = true
s.err = s.all[s.ids[i2]].Err()
break
}
i2++
i1++
continue
}
i2++
}
s.buf = append(s.buf, s.all[i1])
i1++
}
s.all, s.buf = s.buf, s.all
}
func (s *mergedSeriesSet) Next() bool {
if s.done {
return false
}
s.nextWithID()
if s.done {
return false
}
s.ids = s.ids[:0]
if len(s.all) == 0 {
s.done = true
return false
}
// Here we are looking for a set of series sets with the lowest labels,
// and we will cache their indexes in s.ids.
s.ids = append(s.ids, 0)
for i := 1; i < len(s.all); i++ {
cmp := labels.Compare(s.all[s.ids[0]].At().Labels(), s.all[i].At().Labels())
if cmp > 0 {
s.ids = s.ids[:1]
s.ids[0] = i
} else if cmp == 0 {
s.ids = append(s.ids, i)
}
}
if len(s.ids) > 1 {
series := make([]Series, len(s.ids))
for i, idx := range s.ids {
series[i] = s.all[idx].At()
}
s.cur = &chainedSeries{series: series}
} else {
s.cur = s.all[s.ids[0]].At()
}
return true
}
type mergedVerticalSeriesSet struct {
a, b SeriesSet
cur Series
adone, bdone bool
}
// NewMergedVerticalSeriesSet takes two series sets as a single series set.
// The input series sets must be sorted and
// the time ranges of the series can be overlapping.
func NewMergedVerticalSeriesSet(a, b SeriesSet) SeriesSet {
return newMergedVerticalSeriesSet(a, b)
}
func newMergedVerticalSeriesSet(a, b SeriesSet) *mergedVerticalSeriesSet {
s := &mergedVerticalSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return s
}
func (s *mergedVerticalSeriesSet) At() Series {
return s.cur
}
func (s *mergedVerticalSeriesSet) Err() error {
if s.a.Err() != nil {
return s.a.Err()
}
return s.b.Err()
}
func (s *mergedVerticalSeriesSet) compare() int {
if s.adone {
return 1
}
if s.bdone {
return -1
}
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
}
func (s *mergedVerticalSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
return false
}
d := s.compare()
// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.cur = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
s.cur = s.a.At()
s.adone = !s.a.Next()
} else {
s.cur = &verticalChainedSeries{series: []Series{s.a.At(), s.b.At()}}
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
}
return true
}
// ChunkSeriesSet exposes the chunks and intervals of a series instead of the
// actual series itself.
type ChunkSeriesSet interface {
Next() bool
At() (labels.Labels, []chunks.Meta, tombstones.Intervals)
Err() error
}
// baseChunkSeries loads the label set and chunk references for a postings
// list from an index. It filters out series that have labels set that should be unset.
type baseChunkSeries struct {
p index.Postings
index IndexReader
tombstones tombstones.Reader
lset labels.Labels
chks []chunks.Meta
intervals tombstones.Intervals
err error
}
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) {
return lookupChunkSeries(false, ir, tr, ms...)
}
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader. Series will be in order.
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) {
return lookupChunkSeries(true, ir, tr, ms...)
}
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) {
if tr == nil {
tr = tombstones.NewMemTombstones()
}
p, err := PostingsForMatchers(ir, ms...)
if err != nil {
return nil, err
}
if sorted {
p = ir.SortedPostings(p)
}
return &baseChunkSeries{
p: p,
index: ir,
tombstones: tr,
}, nil
}
func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return s.lset, s.chks, s.intervals
}
func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool {
var (
lset = make(labels.Labels, len(s.lset))
chkMetas = make([]chunks.Meta, len(s.chks))
err error
)
for s.p.Next() {
ref := s.p.At()
if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == ErrNotFound {
continue
}
s.err = err
return false
}
s.lset = lset
s.chks = chkMetas
s.intervals, err = s.tombstones.Get(s.p.At())
if err != nil {
s.err = errors.Wrap(err, "get tombstones")
return false
}
if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted.
chks := make([]chunks.Meta, 0, len(s.chks))
for _, chk := range s.chks {
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(s.intervals)) {
chks = append(chks, chk)
}
}
s.chks = chks
}
return true
}
if err := s.p.Err(); err != nil {
s.err = err
}
return false
}
// populatedChunkSeries loads chunk data from a store for a set of series
// with known chunk references. It filters out chunks that do not fit the
// given time range.
type populatedChunkSeries struct {
set ChunkSeriesSet
chunks ChunkReader
mint, maxt int64
err error
chks []chunks.Meta
lset labels.Labels
intervals tombstones.Intervals
}
func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return s.lset, s.chks, s.intervals
}
func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool {
for s.set.Next() {
lset, chks, dranges := s.set.At()
for len(chks) > 0 {
if chks[0].MaxTime >= s.mint {
break
}
chks = chks[1:]
}
// This is to delete in place while iterating.
for i, rlen := 0, len(chks); i < rlen; i++ {
j := i - (rlen - len(chks))
c := &chks[j]
// Break out at the first chunk that has no overlap with mint, maxt.
if c.MinTime > s.maxt {
chks = chks[:j]
break
}
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
if s.err != nil {
// This means that the chunk has be garbage collected. Remove it from the list.
if s.err == ErrNotFound {
s.err = nil
// Delete in-place.
s.chks = append(chks[:j], chks[j+1:]...)
}
return false
}
}
if len(chks) == 0 {
continue
}
s.lset = lset
s.chks = chks
s.intervals = dranges
return true
}
if err := s.set.Err(); err != nil {
s.err = err
}
return false
}
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
set ChunkSeriesSet
err error
cur Series
mint, maxt int64
}
func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks, dranges := s.set.At()
s.cur = &chunkSeries{
labels: lset,
chunks: chunks,
mint: s.mint,
maxt: s.maxt,
intervals: dranges,
}
return true
}
if s.set.Err() != nil {
s.err = s.set.Err()
}
return false
}
func (s *blockSeriesSet) At() Series { return s.cur }
func (s *blockSeriesSet) Err() error { return s.err }
// chunkSeries is a series that is backed by a sequence of chunks holding
// time series data.
type chunkSeries struct {
labels labels.Labels
chunks []chunks.Meta // in-order chunk refs
mint, maxt int64
intervals tombstones.Intervals
}
func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}
func (s *chunkSeries) Iterator() SeriesIterator {
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
}
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
// chainedSeries implements a series for a list of time-sorted series.
// They all must have the same labels.
type chainedSeries struct {
series []Series
}
func (s *chainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}
func (s *chainedSeries) Iterator() SeriesIterator {
return newChainedSeriesIterator(s.series...)
}
// chainedSeriesIterator implements a series iterator over a list
// of time-sorted, non-overlapping iterators.
type chainedSeriesIterator struct {
series []Series // series in time order
i int
cur SeriesIterator
}
func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator {
return &chainedSeriesIterator{
series: s,
i: 0,
cur: s[0].Iterator(),
}
}
func (it *chainedSeriesIterator) Seek(t int64) bool {
// We just scan the chained series sequentially as they are already
// pre-selected by relevant time and should be accessed sequentially anyway.
for i, s := range it.series[it.i:] {
cur := s.Iterator()
if !cur.Seek(t) {
continue
}
it.cur = cur
it.i += i
return true
}
return false
}
func (it *chainedSeriesIterator) Next() bool {
if it.cur.Next() {
return true
}
if err := it.cur.Err(); err != nil {
return false
}
if it.i == len(it.series)-1 {
return false
}
it.i++
it.cur = it.series[it.i].Iterator()
return it.Next()
}
func (it *chainedSeriesIterator) At() (t int64, v float64) {
return it.cur.At()
}
func (it *chainedSeriesIterator) Err() error {
return it.cur.Err()
}
// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series.
// They all must have the same labels.
type verticalChainedSeries struct {
series []Series
}
func (s *verticalChainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}
func (s *verticalChainedSeries) Iterator() SeriesIterator {
return newVerticalMergeSeriesIterator(s.series...)
}
// verticalMergeSeriesIterator implements a series iterator over a list
// of time-sorted, time-overlapping iterators.
type verticalMergeSeriesIterator struct {
a, b SeriesIterator
aok, bok, initialized bool
curT int64
curV float64
}
func newVerticalMergeSeriesIterator(s ...Series) SeriesIterator {
if len(s) == 1 {
return s[0].Iterator()
} else if len(s) == 2 {
return &verticalMergeSeriesIterator{
a: s[0].Iterator(),
b: s[1].Iterator(),
}
}
return &verticalMergeSeriesIterator{
a: s[0].Iterator(),
b: newVerticalMergeSeriesIterator(s[1:]...),
}
}
func (it *verticalMergeSeriesIterator) Seek(t int64) bool {
it.aok, it.bok = it.a.Seek(t), it.b.Seek(t)
it.initialized = true
return it.Next()
}
func (it *verticalMergeSeriesIterator) Next() bool {
if !it.initialized {
it.aok = it.a.Next()
it.bok = it.b.Next()
it.initialized = true
}
if !it.aok && !it.bok {
return false
}
if !it.aok {
it.curT, it.curV = it.b.At()
it.bok = it.b.Next()
return true
}
if !it.bok {
it.curT, it.curV = it.a.At()
it.aok = it.a.Next()
return true
}
acurT, acurV := it.a.At()
bcurT, bcurV := it.b.At()
if acurT < bcurT {
it.curT, it.curV = acurT, acurV
it.aok = it.a.Next()
} else if acurT > bcurT {
it.curT, it.curV = bcurT, bcurV
it.bok = it.b.Next()
} else {
it.curT, it.curV = bcurT, bcurV
it.aok = it.a.Next()
it.bok = it.b.Next()
}
return true
}
func (it *verticalMergeSeriesIterator) At() (t int64, v float64) {
return it.curT, it.curV
}
func (it *verticalMergeSeriesIterator) Err() error {
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}
// chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct {
chunks []chunks.Meta
i int
cur chunkenc.Iterator
bufDelIter *deletedIterator
maxt, mint int64
intervals tombstones.Intervals
}
func newChunkSeriesIterator(cs []chunks.Meta, dranges tombstones.Intervals, mint, maxt int64) *chunkSeriesIterator {
csi := &chunkSeriesIterator{
chunks: cs,
i: 0,
mint: mint,
maxt: maxt,
intervals: dranges,
}
csi.resetCurIterator()
return csi
}
func (it *chunkSeriesIterator) resetCurIterator() {
if len(it.intervals) == 0 {
it.cur = it.chunks[it.i].Chunk.Iterator(it.cur)
return
}
if it.bufDelIter == nil {
it.bufDelIter = &deletedIterator{
intervals: it.intervals,
}
}
it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it)
it.cur = it.bufDelIter
}
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
if t > it.maxt {
return false
}
// Seek to the first valid value after t.
if t < it.mint {
t = it.mint
}
for ; it.chunks[it.i].MaxTime < t; it.i++ {
if it.i == len(it.chunks)-1 {
return false
}
}
it.resetCurIterator()
for it.cur.Next() {
t0, _ := it.cur.At()
if t0 >= t {
return true
}
}
return false
}
func (it *chunkSeriesIterator) At() (t int64, v float64) {
return it.cur.At()
}
func (it *chunkSeriesIterator) Next() bool {
if it.cur.Next() {
t, _ := it.cur.At()
if t < it.mint {
if !it.Seek(it.mint) {
return false
}
t, _ = it.At()
return t <= it.maxt
}
if t > it.maxt {
return false
}
return true
}
if err := it.cur.Err(); err != nil {
return false
}
if it.i == len(it.chunks)-1 {
return false
}
it.i++
it.resetCurIterator()
return it.Next()
}
func (it *chunkSeriesIterator) Err() error {
return it.cur.Err()
}
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
it chunkenc.Iterator
intervals tombstones.Intervals
}
func (it *deletedIterator) At() (int64, float64) {
return it.it.At()
}
func (it *deletedIterator) Next() bool {
Outer:
for it.it.Next() {
ts, _ := it.it.At()
for _, tr := range it.intervals {
if tr.InBounds(ts) {
continue Outer
}
if ts > tr.Maxt {
it.intervals = it.intervals[1:]
continue
}
return true
}
return true
}
return false
}
func (it *deletedIterator) Err() error {
return it.it.Err()
}
type errSeriesSet struct {
err error
}
func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }