mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Add context argument to tsdb.PostingsForMatchers
Signed-off-by: Alan Protasio <alanprot@gmail.com>
This commit is contained in:
parent
69edd8709b
commit
959c98441b
|
@ -544,7 +544,7 @@ func (r blockChunkReader) Close() error {
|
|||
}
|
||||
|
||||
// Delete matching series between mint and maxt in the block.
|
||||
func (pb *Block) Delete(_ context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||||
func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||||
pb.mtx.Lock()
|
||||
defer pb.mtx.Unlock()
|
||||
|
||||
|
@ -552,7 +552,7 @@ func (pb *Block) Delete(_ context.Context, mint, maxt int64, ms ...*labels.Match
|
|||
return ErrClosing
|
||||
}
|
||||
|
||||
p, err := PostingsForMatchers(pb.indexr, ms...)
|
||||
p, err := PostingsForMatchers(ctx, pb.indexr, ms...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "select series")
|
||||
}
|
||||
|
|
|
@ -1433,7 +1433,7 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match
|
|||
|
||||
ir := h.indexRange(mint, maxt)
|
||||
|
||||
p, err := PostingsForMatchers(ir, ms...)
|
||||
p, err := PostingsForMatchers(ctx, ir, ms...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "select series")
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// nolint:revive // Many unsued function arguments in this file by design.
|
||||
// nolint:revive // Many unused function arguments in this file by design.
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
|
|
@ -127,12 +127,12 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
|||
return &blockQuerier{blockBaseQuerier: q}, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||
func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
disableTrimming := false
|
||||
|
||||
p, err := PostingsForMatchers(q.index, ms...)
|
||||
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
||||
if err != nil {
|
||||
return storage.ErrSeriesSet(err)
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
|
|||
return &blockChunkQuerier{blockBaseQuerier: q}, nil
|
||||
}
|
||||
|
||||
func (q *blockChunkQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
disableTrimming := false
|
||||
|
@ -176,7 +176,7 @@ func (q *blockChunkQuerier) Select(_ context.Context, sortSeries bool, hints *st
|
|||
maxt = hints.End
|
||||
disableTrimming = hints.DisableTrimming
|
||||
}
|
||||
p, err := PostingsForMatchers(q.index, ms...)
|
||||
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
||||
if err != nil {
|
||||
return storage.ErrChunkSeriesSet(err)
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ func findSetMatches(pattern string) []string {
|
|||
|
||||
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||
// based on the given matchers. The resulting postings are not ordered by series.
|
||||
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||
func PostingsForMatchers(ctx context.Context, ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||
var its, notIts []index.Postings
|
||||
// See which label must be non-empty.
|
||||
// Optimization for case like {l=~".", l!="1"}.
|
||||
|
@ -267,7 +267,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
|
||||
// doesn't include series that may be added to the index reader during this function call.
|
||||
k, v := index.AllPostingsKey()
|
||||
allPostings, err := ix.Postings(context.TODO(), k, v)
|
||||
allPostings, err := ix.Postings(ctx, k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -284,10 +284,13 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
})
|
||||
|
||||
for _, m := range ms {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
switch {
|
||||
case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
|
||||
k, v := index.AllPostingsKey()
|
||||
allPostings, err := ix.Postings(context.TODO(), k, v)
|
||||
allPostings, err := ix.Postings(ctx, k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -305,7 +308,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
it, err := postingsForMatcher(ix, inverse)
|
||||
it, err := postingsForMatcher(ctx, ix, inverse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -318,7 +321,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
it, err := inversePostingsForMatcher(ix, inverse)
|
||||
it, err := inversePostingsForMatcher(ctx, ix, inverse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -328,7 +331,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
its = append(its, it)
|
||||
default: // l="a"
|
||||
// Non-Not matcher, use normal postingsForMatcher.
|
||||
it, err := postingsForMatcher(ix, m)
|
||||
it, err := postingsForMatcher(ctx, ix, m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -342,7 +345,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
// the series which don't have the label name set too. See:
|
||||
// https://github.com/prometheus/prometheus/issues/3575 and
|
||||
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
||||
it, err := inversePostingsForMatcher(ix, m)
|
||||
it, err := inversePostingsForMatcher(ctx, ix, m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -359,23 +362,23 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
return it, nil
|
||||
}
|
||||
|
||||
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||
func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||
// This method will not return postings for missing labels.
|
||||
|
||||
// Fast-path for equal matching.
|
||||
if m.Type == labels.MatchEqual {
|
||||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
||||
return ix.Postings(ctx, m.Name, m.Value)
|
||||
}
|
||||
|
||||
// Fast-path for set matching.
|
||||
if m.Type == labels.MatchRegexp {
|
||||
setMatches := findSetMatches(m.GetRegexString())
|
||||
if len(setMatches) > 0 {
|
||||
return ix.Postings(context.TODO(), m.Name, setMatches...)
|
||||
return ix.Postings(ctx, m.Name, setMatches...)
|
||||
}
|
||||
}
|
||||
|
||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
||||
vals, err := ix.LabelValues(ctx, m.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -391,28 +394,28 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
|
|||
return index.EmptyPostings(), nil
|
||||
}
|
||||
|
||||
return ix.Postings(context.TODO(), m.Name, res...)
|
||||
return ix.Postings(ctx, m.Name, res...)
|
||||
}
|
||||
|
||||
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
||||
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||
func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||
// Fast-path for MatchNotRegexp matching.
|
||||
// Inverse of a MatchNotRegexp is MatchRegexp (double negation).
|
||||
// Fast-path for set matching.
|
||||
if m.Type == labels.MatchNotRegexp {
|
||||
setMatches := findSetMatches(m.GetRegexString())
|
||||
if len(setMatches) > 0 {
|
||||
return ix.Postings(context.TODO(), m.Name, setMatches...)
|
||||
return ix.Postings(ctx, m.Name, setMatches...)
|
||||
}
|
||||
}
|
||||
|
||||
// Fast-path for MatchNotEqual matching.
|
||||
// Inverse of a MatchNotEqual is MatchEqual (double negation).
|
||||
if m.Type == labels.MatchNotEqual {
|
||||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
||||
return ix.Postings(ctx, m.Name, m.Value)
|
||||
}
|
||||
|
||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
||||
vals, err := ix.LabelValues(ctx, m.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -429,11 +432,11 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
|
|||
}
|
||||
}
|
||||
|
||||
return ix.Postings(context.TODO(), m.Name, res...)
|
||||
return ix.Postings(ctx, m.Name, res...)
|
||||
}
|
||||
|
||||
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
p, err := PostingsForMatchers(r, matchers...)
|
||||
p, err := PostingsForMatchers(ctx, r, matchers...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching postings for matchers")
|
||||
}
|
||||
|
@ -483,7 +486,7 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
|
|||
}
|
||||
|
||||
func labelNamesWithMatchers(ctx context.Context, r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
|
||||
p, err := PostingsForMatchers(r, matchers...)
|
||||
p, err := PostingsForMatchers(ctx, r, matchers...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -91,6 +91,8 @@ func BenchmarkQuerier(b *testing.B) {
|
|||
}
|
||||
|
||||
func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||
ctx := context.Background()
|
||||
|
||||
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
|
||||
nX := labels.MustNewMatcher(labels.MatchEqual, "n", "X"+postingsBenchSuffix)
|
||||
|
||||
|
@ -166,7 +168,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
|||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := PostingsForMatchers(ir, c.matchers...)
|
||||
_, err := PostingsForMatchers(ctx, ir, c.matchers...)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -1898,6 +1898,8 @@ func TestFindSetMatches(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestPostingsForMatchers(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
chunkDir := t.TempDir()
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
|
@ -2176,7 +2178,7 @@ func TestPostingsForMatchers(t *testing.T) {
|
|||
for _, l := range c.exp {
|
||||
exp[l.String()] = struct{}{}
|
||||
}
|
||||
p, err := PostingsForMatchers(ir, c.matchers...)
|
||||
p, err := PostingsForMatchers(ctx, ir, c.matchers...)
|
||||
require.NoError(t, err)
|
||||
|
||||
var builder labels.ScratchBuilder
|
||||
|
@ -2471,6 +2473,8 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
|
|||
}
|
||||
|
||||
func TestPostingsForMatcher(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
cases := []struct {
|
||||
matcher *labels.Matcher
|
||||
hasError bool
|
||||
|
@ -2498,7 +2502,7 @@ func TestPostingsForMatcher(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
ir := &mockMatcherIndex{}
|
||||
_, err := postingsForMatcher(ir, tc.matcher)
|
||||
_, err := postingsForMatcher(ctx, ir, tc.matcher)
|
||||
if tc.hasError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
|
|
Loading…
Reference in a new issue