Allow forcing usage of PostingsForMatchersCache

When out-of-order is enabled, queries go through both Head and OOOHead,
and they both execute the same PostingsForMatchers call, as memSeries
are shared for both.

In some cases these calls can be heavy, and also frequent. We can
deduplicate those calls by using the PostingsForMatchers cache that we
already use for query sharding.

The usage of this cache can skip a newly appended series in the results
for the duration of the ttl.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
Oleg Zaytsev 2022-12-28 13:13:54 +01:00
parent 0db77b4c76
commit d23859dee2
No known key found for this signature in database
GPG key ID: 7E9FE9FD48F512EF
5 changed files with 130 additions and 99 deletions

View file

@ -351,7 +351,7 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache
if err != nil {
return nil, err
}
pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize)
pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
ir := indexReaderWithPostingsForMatchers{indexReader, pfmc}
closers = append(closers, ir)

View file

@ -72,20 +72,23 @@ var ErrNotReady = errors.New("TSDB not ready")
// millisecond precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
WALCompression: false,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
WALCompression: false,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
HeadPostingsForMatchersCacheForce: false,
}
}
@ -189,6 +192,15 @@ type Options struct {
// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
// If it is <=0, the default value is assumed.
OutOfOrderCapMax int64
// HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head.
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
HeadPostingsForMatchersCacheTTL time.Duration
// HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head.
// It's ignored used when HeadPostingsForMatchersCacheTTL is 0.
HeadPostingsForMatchersCacheSize int
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
HeadPostingsForMatchersCacheForce bool
}
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -798,6 +810,9 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.PostingsForMatchersCacheTTL = opts.HeadPostingsForMatchersCacheTTL
headOpts.PostingsForMatchersCacheSize = opts.HeadPostingsForMatchersCacheSize
headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce
if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled

View file

@ -170,6 +170,10 @@ type HeadOptions struct {
EnableMemorySnapshotOnShutdown bool
IsolationDisabled bool
PostingsForMatchersCacheTTL time.Duration
PostingsForMatchersCacheSize int
PostingsForMatchersCacheForce bool
}
const (
@ -179,15 +183,18 @@ const (
func DefaultHeadOptions() *HeadOptions {
ho := &HeadOptions{
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
PostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
PostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
PostingsForMatchersCacheForce: false,
}
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
return ho
@ -254,7 +261,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea
stats: stats,
reg: r,
pfmc: NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize),
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheSize, opts.PostingsForMatchersCacheForce),
}
if err := h.resetInMemoryState(); err != nil {
return nil, err

View file

@ -27,14 +27,17 @@ type IndexPostingsReader interface {
Postings(name string, values ...string) (index.Postings, error)
}
// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForMatchersCache {
// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
// If `ttl` is 0, then it only deduplicates in-flight requests.
// If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided.
func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) *PostingsForMatchersCache {
b := &PostingsForMatchersCache{
calls: &sync.Map{},
cached: list.New(),
ttl: ttl,
cacheSize: cacheSize,
force: force,
timeNow: time.Now,
postingsForMatchers: PostingsForMatchers,
@ -43,7 +46,7 @@ func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForM
return b
}
// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in.
// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in or force is true.
type PostingsForMatchersCache struct {
calls *sync.Map
@ -52,6 +55,7 @@ type PostingsForMatchersCache struct {
ttl time.Duration
cacheSize int
force bool
// timeNow is the time.Now that can be replaced for testing purposes
timeNow func() time.Time
@ -60,7 +64,7 @@ type PostingsForMatchersCache struct {
}
func (c *PostingsForMatchersCache) PostingsForMatchers(ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
if !concurrent {
if !concurrent && !c.force {
return c.postingsForMatchers(ix, ms...)
}
c.expire()

View file

@ -16,8 +16,8 @@ import (
func TestPostingsForMatchersCache(t *testing.T) {
const testCacheSize = 5
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, testCacheSize)
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, testCacheSize, force)
if c.postingsForMatchers == nil {
t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func")
}
@ -36,7 +36,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix)
require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms)
return index.ErrPostings(expectedPostingsErr), nil
}, &timeNowMock{})
}, &timeNowMock{}, false)
p, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, expectedMatchers...)
require.NoError(t, err)
@ -52,7 +52,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
return nil, expectedErr
}, &timeNowMock{})
}, &timeNowMock{}, false)
_, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.Equal(t, expectedErr, err)
@ -61,76 +61,81 @@ func TestPostingsForMatchersCache(t *testing.T) {
t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) {
for _, cacheEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("cacheEnabled=%t", cacheEnabled), func(t *testing.T) {
calls := [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same
{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type
{labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name
{labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same
}
// we'll identify results by each call's error, and the error will be the string value of the first matcher
matchersString := func(ms []*labels.Matcher) string {
s := strings.Builder{}
for i, m := range ms {
if i > 0 {
s.WriteByte(',')
for _, forced := range []bool{true, false} {
concurrent := !forced
t.Run(fmt.Sprintf("forced=%t", forced), func(t *testing.T) {
calls := [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same
{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type
{labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name
{labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same
}
s.WriteString(m.String())
}
return s.String()
}
expectedResults := make([]string, len(calls))
for i, c := range calls {
expectedResults[i] = c[0].String()
}
expectedPostingsForMatchersCalls := 5
// we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic
called := make(chan struct{}, expectedPostingsForMatchersCalls)
release := make(chan struct{})
var ttl time.Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
select {
case called <- struct{}{}:
default:
}
<-release
return nil, fmt.Errorf(matchersString(ms))
}, &timeNowMock{})
// we'll identify results by each call's error, and the error will be the string value of the first matcher
matchersString := func(ms []*labels.Matcher) string {
s := strings.Builder{}
for i, m := range ms {
if i > 0 {
s.WriteByte(',')
}
s.WriteString(m.String())
}
return s.String()
}
expectedResults := make([]string, len(calls))
for i, c := range calls {
expectedResults[i] = c[0].String()
}
results := make([]string, len(calls))
resultsWg := sync.WaitGroup{}
resultsWg.Add(len(calls))
expectedPostingsForMatchersCalls := 5
// we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic
called := make(chan struct{}, expectedPostingsForMatchersCalls)
release := make(chan struct{})
var ttl time.Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
select {
case called <- struct{}{}:
default:
}
<-release
return nil, fmt.Errorf(matchersString(ms))
}, &timeNowMock{}, forced)
// perform all calls
for i := 0; i < len(calls); i++ {
go func(i int) {
_, err := c.PostingsForMatchers(indexForPostingsMock{}, true, calls[i]...)
results[i] = err.Error()
resultsWg.Done()
}(i)
}
results := make([]string, len(calls))
resultsWg := sync.WaitGroup{}
resultsWg.Add(len(calls))
// wait until all calls arrive to the mocked function
for i := 0; i < expectedPostingsForMatchersCalls; i++ {
<-called
}
// perform all calls
for i := 0; i < len(calls); i++ {
go func(i int) {
_, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, calls[i]...)
results[i] = err.Error()
resultsWg.Done()
}(i)
}
// let them all return
close(release)
// wait until all calls arrive to the mocked function
for i := 0; i < expectedPostingsForMatchersCalls; i++ {
<-called
}
// wait for the results
resultsWg.Wait()
// let them all return
close(release)
// check that we got correct results
for i, c := range calls {
require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
// wait for the results
resultsWg.Wait()
// check that we got correct results
for i, c := range calls {
require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
}
})
}
})
}
@ -143,7 +148,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{})
}, &timeNowMock{}, false)
// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...)
@ -163,7 +168,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(0, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{})
}, &timeNowMock{}, false)
// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
@ -186,7 +191,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, timeNow)
}, timeNow, false)
// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
@ -220,7 +225,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
k := matchersKey(ms)
callsPerMatchers[k]++
return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil
}, timeNow)
}, timeNow, false)
// each one of the first testCacheSize calls is cached properly
for _, matchers := range calls {