diff --git a/tsdb/block.go b/tsdb/block.go index 2c77a6fcf9..fe542fc757 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -351,7 +351,7 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache if err != nil { return nil, err } - pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize) + pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) ir := indexReaderWithPostingsForMatchers{indexReader, pfmc} closers = append(closers, ir) diff --git a/tsdb/db.go b/tsdb/db.go index 8a65c429ce..5c402fe389 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -72,20 +72,23 @@ var ErrNotReady = errors.New("TSDB not ready") // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, - RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), - MinBlockDuration: DefaultBlockDuration, - MaxBlockDuration: DefaultBlockDuration, - NoLockfile: false, - AllowOverlappingCompaction: true, - WALCompression: false, - StripeSize: DefaultStripeSize, - HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, - IsolationDisabled: defaultIsolationDisabled, - HeadChunksEndTimeVariance: 0, - HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, - OutOfOrderCapMax: DefaultOutOfOrderCapMax, + WALSegmentSize: wlog.DefaultSegmentSize, + MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: DefaultBlockDuration, + MaxBlockDuration: DefaultBlockDuration, + NoLockfile: false, + AllowOverlappingCompaction: true, + WALCompression: false, + StripeSize: DefaultStripeSize, + HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, + IsolationDisabled: defaultIsolationDisabled, + HeadChunksEndTimeVariance: 0, + HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, + OutOfOrderCapMax: DefaultOutOfOrderCapMax, + HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, + HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, + HeadPostingsForMatchersCacheForce: false, } } @@ -189,6 +192,15 @@ type Options struct { // OutOfOrderCapMax is maximum capacity for OOO chunks (in samples). // If it is <=0, the default value is assumed. OutOfOrderCapMax int64 + + // HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head. + // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. + HeadPostingsForMatchersCacheTTL time.Duration + // HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head. + // It's ignored used when HeadPostingsForMatchersCacheTTL is 0. + HeadPostingsForMatchersCacheSize int + // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. + HeadPostingsForMatchersCacheForce bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -798,6 +810,9 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) + headOpts.PostingsForMatchersCacheTTL = opts.HeadPostingsForMatchersCacheTTL + headOpts.PostingsForMatchersCacheSize = opts.HeadPostingsForMatchersCacheSize + headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce if opts.IsolationDisabled { // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled diff --git a/tsdb/head.go b/tsdb/head.go index 3c4013e18c..ef2b113370 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -170,6 +170,10 @@ type HeadOptions struct { EnableMemorySnapshotOnShutdown bool IsolationDisabled bool + + PostingsForMatchersCacheTTL time.Duration + PostingsForMatchersCacheSize int + PostingsForMatchersCacheForce bool } const ( @@ -179,15 +183,18 @@ const ( func DefaultHeadOptions() *HeadOptions { ho := &HeadOptions{ - ChunkRange: DefaultBlockDuration, - ChunkDirRoot: "", - ChunkPool: chunkenc.NewPool(), - ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, - ChunkEndTimeVariance: 0, - ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, - StripeSize: DefaultStripeSize, - SeriesCallback: &noopSeriesLifecycleCallback{}, - IsolationDisabled: defaultIsolationDisabled, + ChunkRange: DefaultBlockDuration, + ChunkDirRoot: "", + ChunkPool: chunkenc.NewPool(), + ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, + ChunkEndTimeVariance: 0, + ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, + StripeSize: DefaultStripeSize, + SeriesCallback: &noopSeriesLifecycleCallback{}, + IsolationDisabled: defaultIsolationDisabled, + PostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, + PostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, + PostingsForMatchersCacheForce: false, } ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax) return ho @@ -254,7 +261,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea stats: stats, reg: r, - pfmc: NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize), + pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheSize, opts.PostingsForMatchersCacheForce), } if err := h.resetInMemoryState(); err != nil { return nil, err diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 086c72e4a0..8892d7c2ef 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -27,14 +27,17 @@ type IndexPostingsReader interface { Postings(name string, values ...string) (index.Postings, error) } -// NewPostingsForMatchersCache creates a new PostingsForMatchersCache. -func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForMatchersCache { +// NewPostingsForMatchersCache creates a new PostingsForMatchersCache. +// If `ttl` is 0, then it only deduplicates in-flight requests. +// If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided. +func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) *PostingsForMatchersCache { b := &PostingsForMatchersCache{ calls: &sync.Map{}, cached: list.New(), ttl: ttl, cacheSize: cacheSize, + force: force, timeNow: time.Now, postingsForMatchers: PostingsForMatchers, @@ -43,7 +46,7 @@ func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForM return b } -// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in. +// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in or force is true. type PostingsForMatchersCache struct { calls *sync.Map @@ -52,6 +55,7 @@ type PostingsForMatchersCache struct { ttl time.Duration cacheSize int + force bool // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time @@ -60,7 +64,7 @@ type PostingsForMatchersCache struct { } func (c *PostingsForMatchersCache) PostingsForMatchers(ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { - if !concurrent { + if !concurrent && !c.force { return c.postingsForMatchers(ix, ms...) } c.expire() diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index 4421fc372f..8d238df60b 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -16,8 +16,8 @@ import ( func TestPostingsForMatchersCache(t *testing.T) { const testCacheSize = 5 // newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func - newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock) *PostingsForMatchersCache { - c := NewPostingsForMatchersCache(ttl, testCacheSize) + newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache { + c := NewPostingsForMatchersCache(ttl, testCacheSize, force) if c.postingsForMatchers == nil { t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func") } @@ -36,7 +36,7 @@ func TestPostingsForMatchersCache(t *testing.T) { require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix) require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms) return index.ErrPostings(expectedPostingsErr), nil - }, &timeNowMock{}) + }, &timeNowMock{}, false) p, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, expectedMatchers...) require.NoError(t, err) @@ -52,7 +52,7 @@ func TestPostingsForMatchersCache(t *testing.T) { c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { return nil, expectedErr - }, &timeNowMock{}) + }, &timeNowMock{}, false) _, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) require.Equal(t, expectedErr, err) @@ -61,76 +61,81 @@ func TestPostingsForMatchersCache(t *testing.T) { t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) { for _, cacheEnabled := range []bool{true, false} { t.Run(fmt.Sprintf("cacheEnabled=%t", cacheEnabled), func(t *testing.T) { - calls := [][]*labels.Matcher{ - {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 - {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same - {labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type - {labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name - {labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value - {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 - {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same - } - - // we'll identify results by each call's error, and the error will be the string value of the first matcher - matchersString := func(ms []*labels.Matcher) string { - s := strings.Builder{} - for i, m := range ms { - if i > 0 { - s.WriteByte(',') + for _, forced := range []bool{true, false} { + concurrent := !forced + t.Run(fmt.Sprintf("forced=%t", forced), func(t *testing.T) { + calls := [][]*labels.Matcher{ + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same + {labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type + {labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name + {labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same } - s.WriteString(m.String()) - } - return s.String() - } - expectedResults := make([]string, len(calls)) - for i, c := range calls { - expectedResults[i] = c[0].String() - } - expectedPostingsForMatchersCalls := 5 - // we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic - called := make(chan struct{}, expectedPostingsForMatchersCalls) - release := make(chan struct{}) - var ttl time.Duration - if cacheEnabled { - ttl = defaultPostingsForMatchersCacheTTL - } - c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { - select { - case called <- struct{}{}: - default: - } - <-release - return nil, fmt.Errorf(matchersString(ms)) - }, &timeNowMock{}) + // we'll identify results by each call's error, and the error will be the string value of the first matcher + matchersString := func(ms []*labels.Matcher) string { + s := strings.Builder{} + for i, m := range ms { + if i > 0 { + s.WriteByte(',') + } + s.WriteString(m.String()) + } + return s.String() + } + expectedResults := make([]string, len(calls)) + for i, c := range calls { + expectedResults[i] = c[0].String() + } - results := make([]string, len(calls)) - resultsWg := sync.WaitGroup{} - resultsWg.Add(len(calls)) + expectedPostingsForMatchersCalls := 5 + // we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic + called := make(chan struct{}, expectedPostingsForMatchersCalls) + release := make(chan struct{}) + var ttl time.Duration + if cacheEnabled { + ttl = defaultPostingsForMatchersCacheTTL + } + c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + select { + case called <- struct{}{}: + default: + } + <-release + return nil, fmt.Errorf(matchersString(ms)) + }, &timeNowMock{}, forced) - // perform all calls - for i := 0; i < len(calls); i++ { - go func(i int) { - _, err := c.PostingsForMatchers(indexForPostingsMock{}, true, calls[i]...) - results[i] = err.Error() - resultsWg.Done() - }(i) - } + results := make([]string, len(calls)) + resultsWg := sync.WaitGroup{} + resultsWg.Add(len(calls)) - // wait until all calls arrive to the mocked function - for i := 0; i < expectedPostingsForMatchersCalls; i++ { - <-called - } + // perform all calls + for i := 0; i < len(calls); i++ { + go func(i int) { + _, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, calls[i]...) + results[i] = err.Error() + resultsWg.Done() + }(i) + } - // let them all return - close(release) + // wait until all calls arrive to the mocked function + for i := 0; i < expectedPostingsForMatchersCalls; i++ { + <-called + } - // wait for the results - resultsWg.Wait() + // let them all return + close(release) - // check that we got correct results - for i, c := range calls { - require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i]) + // wait for the results + resultsWg.Wait() + + // check that we got correct results + for i, c := range calls { + require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i]) + } + }) } }) } @@ -143,7 +148,7 @@ func TestPostingsForMatchersCache(t *testing.T) { c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { call++ return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil - }, &timeNowMock{}) + }, &timeNowMock{}, false) // first call, fills the cache p, err := c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...) @@ -163,7 +168,7 @@ func TestPostingsForMatchersCache(t *testing.T) { c := newPostingsForMatchersCache(0, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { call++ return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil - }, &timeNowMock{}) + }, &timeNowMock{}, false) // first call, fills the cache p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) @@ -186,7 +191,7 @@ func TestPostingsForMatchersCache(t *testing.T) { c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { call++ return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil - }, timeNow) + }, timeNow, false) // first call, fills the cache p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) @@ -220,7 +225,7 @@ func TestPostingsForMatchersCache(t *testing.T) { k := matchersKey(ms) callsPerMatchers[k]++ return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil - }, timeNow) + }, timeNow, false) // each one of the first testCacheSize calls is cached properly for _, matchers := range calls {