mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
tsdb: check for context cancel before regex matching postings (#14096)
* tsdb: check for context cancel before regex matching postings Regex matching can be heavy if the regex takes a lot of cycles to evaluate and we can get stuck evaluating postings for a long time without this fix. The constant checkContextEveryNIterations=100 may be changed later. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
e6be4240be
commit
fdaafdb041
|
@ -51,6 +51,9 @@ const (
|
||||||
indexFilename = "index"
|
indexFilename = "index"
|
||||||
|
|
||||||
seriesByteAlign = 16
|
seriesByteAlign = 16
|
||||||
|
|
||||||
|
// checkContextEveryNIterations is used in some tight loops to check if the context is done.
|
||||||
|
checkContextEveryNIterations = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
type indexWriterSeries struct {
|
type indexWriterSeries struct {
|
||||||
|
@ -1797,7 +1800,12 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
|
||||||
}
|
}
|
||||||
|
|
||||||
var its []Postings
|
var its []Postings
|
||||||
|
count := 1
|
||||||
for val, offset := range e {
|
for val, offset := range e {
|
||||||
|
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||||
|
return ErrPostings(ctx.Err())
|
||||||
|
}
|
||||||
|
count++
|
||||||
if !match(val) {
|
if !match(val) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -719,3 +719,36 @@ func TestChunksTimeOrdering(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, idx.Close())
|
require.NoError(t, idx.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
seriesCount := 1000
|
||||||
|
for i := 1; i <= seriesCount; i++ {
|
||||||
|
require.NoError(t, idx.AddSymbol(fmt.Sprintf("%4d", i)))
|
||||||
|
}
|
||||||
|
require.NoError(t, idx.AddSymbol("__name__"))
|
||||||
|
|
||||||
|
for i := 1; i <= seriesCount; i++ {
|
||||||
|
require.NoError(t, idx.AddSeries(storage.SeriesRef(i), labels.FromStrings("__name__", fmt.Sprintf("%4d", i)),
|
||||||
|
chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, idx.Close())
|
||||||
|
|
||||||
|
ir, err := NewFileReader(filepath.Join(dir, "index"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ir.Close()
|
||||||
|
|
||||||
|
failAfter := uint64(seriesCount / 2) // Fail after processing half of the series.
|
||||||
|
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
|
||||||
|
p := ir.PostingsForLabelMatching(ctx, "__name__", func(string) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
require.Error(t, p.Err())
|
||||||
|
require.Equal(t, failAfter, ctx.Count())
|
||||||
|
}
|
||||||
|
|
|
@ -416,7 +416,12 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
||||||
}
|
}
|
||||||
|
|
||||||
var its []Postings
|
var its []Postings
|
||||||
|
count := 1
|
||||||
for _, v := range vals {
|
for _, v := range vals {
|
||||||
|
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||||
|
return ErrPostings(ctx.Err())
|
||||||
|
}
|
||||||
|
count++
|
||||||
if match(v) {
|
if match(v) {
|
||||||
its = append(its, NewListPostings(e[v]))
|
its = append(its, NewListPostings(e[v]))
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMemPostings_addFor(t *testing.T) {
|
func TestMemPostings_addFor(t *testing.T) {
|
||||||
|
@ -1282,3 +1283,19 @@ func BenchmarkListPostings(b *testing.B) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
|
||||||
|
memP := NewMemPostings()
|
||||||
|
seriesCount := 10 * checkContextEveryNIterations
|
||||||
|
for i := 1; i <= seriesCount; i++ {
|
||||||
|
memP.Add(storage.SeriesRef(i), labels.FromStrings("__name__", fmt.Sprintf("%4d", i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
failAfter := uint64(seriesCount / 2 / checkContextEveryNIterations)
|
||||||
|
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
|
||||||
|
p := memP.PostingsForLabelMatching(ctx, "__name__", func(string) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
require.Error(t, p.Err())
|
||||||
|
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
|
||||||
|
}
|
||||||
|
|
|
@ -33,6 +33,9 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/annotations"
|
"github.com/prometheus/prometheus/util/annotations"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// checkContextEveryNIterations is used in some tight loops to check if the context is done.
|
||||||
|
const checkContextEveryNIterations = 100
|
||||||
|
|
||||||
type blockBaseQuerier struct {
|
type blockBaseQuerier struct {
|
||||||
blockID ulid.ULID
|
blockID ulid.ULID
|
||||||
index IndexReader
|
index IndexReader
|
||||||
|
@ -358,7 +361,12 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Ma
|
||||||
if m.Type == labels.MatchEqual && m.Value == "" {
|
if m.Type == labels.MatchEqual && m.Value == "" {
|
||||||
res = vals
|
res = vals
|
||||||
} else {
|
} else {
|
||||||
|
count := 1
|
||||||
for _, val := range vals {
|
for _, val := range vals {
|
||||||
|
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
count++
|
||||||
if !m.Matches(val) {
|
if !m.Matches(val) {
|
||||||
res = append(res, val)
|
res = append(res, val)
|
||||||
}
|
}
|
||||||
|
@ -387,7 +395,12 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
|
||||||
// re-use the allValues slice to avoid allocations
|
// re-use the allValues slice to avoid allocations
|
||||||
// this is safe because the iteration is always ahead of the append
|
// this is safe because the iteration is always ahead of the append
|
||||||
filteredValues := allValues[:0]
|
filteredValues := allValues[:0]
|
||||||
|
count := 1
|
||||||
for _, v := range allValues {
|
for _, v := range allValues {
|
||||||
|
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
count++
|
||||||
if m.Matches(v) {
|
if m.Matches(v) {
|
||||||
filteredValues = append(filteredValues, v)
|
filteredValues = append(filteredValues, v)
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"github.com/prometheus/prometheus/util/annotations"
|
"github.com/prometheus/prometheus/util/annotations"
|
||||||
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(bwplotka): Replace those mocks with remote.concreteSeriesSet.
|
// TODO(bwplotka): Replace those mocks with remote.concreteSeriesSet.
|
||||||
|
@ -3638,3 +3639,77 @@ func TestQueryWithOneChunkCompletelyDeleted(t *testing.T) {
|
||||||
require.NoError(t, css.Err())
|
require.NoError(t, css.Err())
|
||||||
require.Equal(t, 1, seriesCount)
|
require.Equal(t, 1, seriesCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
|
||||||
|
ir := mockReaderOfLabels{}
|
||||||
|
|
||||||
|
failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
|
||||||
|
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
|
||||||
|
_, err := labelValuesWithMatchers(ctx, ir, "__name__", labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
|
||||||
|
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
|
||||||
|
ir := mockReaderOfLabels{}
|
||||||
|
|
||||||
|
failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
|
||||||
|
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
|
||||||
|
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||||
|
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockReaderOfLabels struct{}
|
||||||
|
|
||||||
|
const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
|
||||||
|
return make([]string, mockReaderOfLabelsSeriesCount), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) {
|
||||||
|
panic("LabelValueFor called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) SortedLabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
|
||||||
|
panic("SortedLabelValues called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) LabelNames(context.Context, ...*labels.Matcher) ([]string, error) {
|
||||||
|
panic("LabelNames called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) LabelNamesFor(context.Context, ...storage.SeriesRef) ([]string, error) {
|
||||||
|
panic("LabelNamesFor called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
|
||||||
|
panic("PostingsForLabelMatching called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) {
|
||||||
|
panic("Postings called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) ShardedPostings(index.Postings, uint64, uint64) index.Postings {
|
||||||
|
panic("Postings called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) SortedPostings(index.Postings) index.Postings {
|
||||||
|
panic("SortedPostings called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) Series(storage.SeriesRef, *labels.ScratchBuilder, *[]chunks.Meta) error {
|
||||||
|
panic("Series called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockReaderOfLabels) Symbols() index.StringIter {
|
||||||
|
panic("Series called")
|
||||||
|
}
|
||||||
|
|
|
@ -13,7 +13,12 @@
|
||||||
|
|
||||||
package testutil
|
package testutil
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
// A MockContext provides a simple stub implementation of a Context.
|
// A MockContext provides a simple stub implementation of a Context.
|
||||||
type MockContext struct {
|
type MockContext struct {
|
||||||
|
@ -40,3 +45,23 @@ func (c *MockContext) Err() error {
|
||||||
func (c *MockContext) Value(interface{}) interface{} {
|
func (c *MockContext) Value(interface{}) interface{} {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MockContextErrAfter is a MockContext that will return an error after a certain
|
||||||
|
// number of calls to Err().
|
||||||
|
type MockContextErrAfter struct {
|
||||||
|
MockContext
|
||||||
|
count atomic.Uint64
|
||||||
|
FailAfter uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *MockContextErrAfter) Err() error {
|
||||||
|
c.count.Inc()
|
||||||
|
if c.count.Load() >= c.FailAfter {
|
||||||
|
return context.Canceled
|
||||||
|
}
|
||||||
|
return c.MockContext.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *MockContextErrAfter) Count() uint64 {
|
||||||
|
return c.count.Load()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue