mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #12831 from aknuds1/arve/posting-context
Add context argument to `tsdb.PostingsForMatchers`
This commit is contained in:
commit
9071913fd9
|
@ -544,7 +544,7 @@ func (r blockChunkReader) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete matching series between mint and maxt in the block.
|
// Delete matching series between mint and maxt in the block.
|
||||||
func (pb *Block) Delete(_ context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||||||
pb.mtx.Lock()
|
pb.mtx.Lock()
|
||||||
defer pb.mtx.Unlock()
|
defer pb.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -552,7 +552,7 @@ func (pb *Block) Delete(_ context.Context, mint, maxt int64, ms ...*labels.Match
|
||||||
return ErrClosing
|
return ErrClosing
|
||||||
}
|
}
|
||||||
|
|
||||||
p, err := PostingsForMatchers(pb.indexr, ms...)
|
p, err := PostingsForMatchers(ctx, pb.indexr, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "select series")
|
return errors.Wrap(err, "select series")
|
||||||
}
|
}
|
||||||
|
|
|
@ -1433,7 +1433,7 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match
|
||||||
|
|
||||||
ir := h.indexRange(mint, maxt)
|
ir := h.indexRange(mint, maxt)
|
||||||
|
|
||||||
p, err := PostingsForMatchers(ir, ms...)
|
p, err := PostingsForMatchers(ctx, ir, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "select series")
|
return errors.Wrap(err, "select series")
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// nolint:revive // Many unsued function arguments in this file by design.
|
// nolint:revive // Many unused function arguments in this file by design.
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
@ -127,12 +127,12 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
||||||
return &blockQuerier{blockBaseQuerier: q}, nil
|
return &blockQuerier{blockBaseQuerier: q}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||||
mint := q.mint
|
mint := q.mint
|
||||||
maxt := q.maxt
|
maxt := q.maxt
|
||||||
disableTrimming := false
|
disableTrimming := false
|
||||||
|
|
||||||
p, err := PostingsForMatchers(q.index, ms...)
|
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.ErrSeriesSet(err)
|
return storage.ErrSeriesSet(err)
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,7 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
|
||||||
return &blockChunkQuerier{blockBaseQuerier: q}, nil
|
return &blockChunkQuerier{blockBaseQuerier: q}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockChunkQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||||
mint := q.mint
|
mint := q.mint
|
||||||
maxt := q.maxt
|
maxt := q.maxt
|
||||||
disableTrimming := false
|
disableTrimming := false
|
||||||
|
@ -176,7 +176,7 @@ func (q *blockChunkQuerier) Select(_ context.Context, sortSeries bool, hints *st
|
||||||
maxt = hints.End
|
maxt = hints.End
|
||||||
disableTrimming = hints.DisableTrimming
|
disableTrimming = hints.DisableTrimming
|
||||||
}
|
}
|
||||||
p, err := PostingsForMatchers(q.index, ms...)
|
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.ErrChunkSeriesSet(err)
|
return storage.ErrChunkSeriesSet(err)
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ func findSetMatches(pattern string) []string {
|
||||||
|
|
||||||
// PostingsForMatchers assembles a single postings iterator against the index reader
|
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||||
// based on the given matchers. The resulting postings are not ordered by series.
|
// based on the given matchers. The resulting postings are not ordered by series.
|
||||||
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
func PostingsForMatchers(ctx context.Context, ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
var its, notIts []index.Postings
|
var its, notIts []index.Postings
|
||||||
// See which label must be non-empty.
|
// See which label must be non-empty.
|
||||||
// Optimization for case like {l=~".", l!="1"}.
|
// Optimization for case like {l=~".", l!="1"}.
|
||||||
|
@ -267,7 +267,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
|
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
|
||||||
// doesn't include series that may be added to the index reader during this function call.
|
// doesn't include series that may be added to the index reader during this function call.
|
||||||
k, v := index.AllPostingsKey()
|
k, v := index.AllPostingsKey()
|
||||||
allPostings, err := ix.Postings(context.TODO(), k, v)
|
allPostings, err := ix.Postings(ctx, k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -284,10 +284,13 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, m := range ms {
|
for _, m := range ms {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
|
case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
|
||||||
k, v := index.AllPostingsKey()
|
k, v := index.AllPostingsKey()
|
||||||
allPostings, err := ix.Postings(context.TODO(), k, v)
|
allPostings, err := ix.Postings(ctx, k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -305,7 +308,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
it, err := postingsForMatcher(ix, inverse)
|
it, err := postingsForMatcher(ctx, ix, inverse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -318,7 +321,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
it, err := inversePostingsForMatcher(ix, inverse)
|
it, err := inversePostingsForMatcher(ctx, ix, inverse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -328,7 +331,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
its = append(its, it)
|
its = append(its, it)
|
||||||
default: // l="a"
|
default: // l="a"
|
||||||
// Non-Not matcher, use normal postingsForMatcher.
|
// Non-Not matcher, use normal postingsForMatcher.
|
||||||
it, err := postingsForMatcher(ix, m)
|
it, err := postingsForMatcher(ctx, ix, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -342,7 +345,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
// the series which don't have the label name set too. See:
|
// the series which don't have the label name set too. See:
|
||||||
// https://github.com/prometheus/prometheus/issues/3575 and
|
// https://github.com/prometheus/prometheus/issues/3575 and
|
||||||
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
||||||
it, err := inversePostingsForMatcher(ix, m)
|
it, err := inversePostingsForMatcher(ctx, ix, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -359,23 +362,23 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
||||||
return it, nil
|
return it, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||||
// This method will not return postings for missing labels.
|
// This method will not return postings for missing labels.
|
||||||
|
|
||||||
// Fast-path for equal matching.
|
// Fast-path for equal matching.
|
||||||
if m.Type == labels.MatchEqual {
|
if m.Type == labels.MatchEqual {
|
||||||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
return ix.Postings(ctx, m.Name, m.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast-path for set matching.
|
// Fast-path for set matching.
|
||||||
if m.Type == labels.MatchRegexp {
|
if m.Type == labels.MatchRegexp {
|
||||||
setMatches := findSetMatches(m.GetRegexString())
|
setMatches := findSetMatches(m.GetRegexString())
|
||||||
if len(setMatches) > 0 {
|
if len(setMatches) > 0 {
|
||||||
return ix.Postings(context.TODO(), m.Name, setMatches...)
|
return ix.Postings(ctx, m.Name, setMatches...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
vals, err := ix.LabelValues(ctx, m.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -391,28 +394,28 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
|
||||||
return index.EmptyPostings(), nil
|
return index.EmptyPostings(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return ix.Postings(context.TODO(), m.Name, res...)
|
return ix.Postings(ctx, m.Name, res...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
||||||
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||||
// Fast-path for MatchNotRegexp matching.
|
// Fast-path for MatchNotRegexp matching.
|
||||||
// Inverse of a MatchNotRegexp is MatchRegexp (double negation).
|
// Inverse of a MatchNotRegexp is MatchRegexp (double negation).
|
||||||
// Fast-path for set matching.
|
// Fast-path for set matching.
|
||||||
if m.Type == labels.MatchNotRegexp {
|
if m.Type == labels.MatchNotRegexp {
|
||||||
setMatches := findSetMatches(m.GetRegexString())
|
setMatches := findSetMatches(m.GetRegexString())
|
||||||
if len(setMatches) > 0 {
|
if len(setMatches) > 0 {
|
||||||
return ix.Postings(context.TODO(), m.Name, setMatches...)
|
return ix.Postings(ctx, m.Name, setMatches...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast-path for MatchNotEqual matching.
|
// Fast-path for MatchNotEqual matching.
|
||||||
// Inverse of a MatchNotEqual is MatchEqual (double negation).
|
// Inverse of a MatchNotEqual is MatchEqual (double negation).
|
||||||
if m.Type == labels.MatchNotEqual {
|
if m.Type == labels.MatchNotEqual {
|
||||||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
return ix.Postings(ctx, m.Name, m.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
vals, err := ix.LabelValues(ctx, m.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -429,11 +432,11 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ix.Postings(context.TODO(), m.Name, res...)
|
return ix.Postings(ctx, m.Name, res...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||||
p, err := PostingsForMatchers(r, matchers...)
|
p, err := PostingsForMatchers(ctx, r, matchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "fetching postings for matchers")
|
return nil, errors.Wrap(err, "fetching postings for matchers")
|
||||||
}
|
}
|
||||||
|
@ -483,7 +486,7 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelNamesWithMatchers(ctx context.Context, r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
|
func labelNamesWithMatchers(ctx context.Context, r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
|
||||||
p, err := PostingsForMatchers(r, matchers...)
|
p, err := PostingsForMatchers(ctx, r, matchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,6 +91,8 @@ func BenchmarkQuerier(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
|
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
|
||||||
nX := labels.MustNewMatcher(labels.MatchEqual, "n", "X"+postingsBenchSuffix)
|
nX := labels.MustNewMatcher(labels.MatchEqual, "n", "X"+postingsBenchSuffix)
|
||||||
|
|
||||||
|
@ -166,7 +168,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, err := PostingsForMatchers(ir, c.matchers...)
|
_, err := PostingsForMatchers(ctx, ir, c.matchers...)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -1898,6 +1898,8 @@ func TestFindSetMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPostingsForMatchers(t *testing.T) {
|
func TestPostingsForMatchers(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
chunkDir := t.TempDir()
|
chunkDir := t.TempDir()
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
opts.ChunkRange = 1000
|
opts.ChunkRange = 1000
|
||||||
|
@ -2176,7 +2178,7 @@ func TestPostingsForMatchers(t *testing.T) {
|
||||||
for _, l := range c.exp {
|
for _, l := range c.exp {
|
||||||
exp[l.String()] = struct{}{}
|
exp[l.String()] = struct{}{}
|
||||||
}
|
}
|
||||||
p, err := PostingsForMatchers(ir, c.matchers...)
|
p, err := PostingsForMatchers(ctx, ir, c.matchers...)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var builder labels.ScratchBuilder
|
var builder labels.ScratchBuilder
|
||||||
|
@ -2471,6 +2473,8 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPostingsForMatcher(t *testing.T) {
|
func TestPostingsForMatcher(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
matcher *labels.Matcher
|
matcher *labels.Matcher
|
||||||
hasError bool
|
hasError bool
|
||||||
|
@ -2498,7 +2502,7 @@ func TestPostingsForMatcher(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
ir := &mockMatcherIndex{}
|
ir := &mockMatcherIndex{}
|
||||||
_, err := postingsForMatcher(ir, tc.matcher)
|
_, err := postingsForMatcher(ctx, ir, tc.matcher)
|
||||||
if tc.hasError {
|
if tc.hasError {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue