diff --git a/tsdb/block.go b/tsdb/block.go index bb51825a2..f5c97d48a 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -75,6 +75,12 @@ type IndexReader interface { // during background garbage collections. Input values must be sorted. Postings(name string, values ...string) (index.Postings, error) + // PostingsForMatchers assembles a single postings iterator based on the given matchers. + // The resulting postings are not ordered by series. + // If concurrent hint is set to true, call will be optimized for a (most likely) concurrent call with same matchers, + // avoiding same calculations twice, however this implementation may lead to a worse performance when called once. + PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) + // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings @@ -309,10 +315,12 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache } closers = append(closers, cr) - ir, err := index.NewFileReaderWithCache(filepath.Join(dir, indexFilename), cache) + indexReader, err := index.NewFileReaderWithCache(filepath.Join(dir, indexFilename), cache) if err != nil { return nil, err } + pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize) + ir := indexReaderWithPostingsForMatchers{indexReader, pfmc} closers = append(closers, ir) tr, sizeTomb, err := tombstones.ReadTombstones(dir) @@ -476,6 +484,10 @@ func (r blockIndexReader) Postings(name string, values ...string) (index.Posting return p, nil } +func (r blockIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + return r.ir.PostingsForMatchers(concurrent, ms...) +} + func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { return r.ir.SortedPostings(p) } @@ -536,7 +548,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return ErrClosing } - p, err := PostingsForMatchers(pb.indexr, ms...) + p, err := pb.indexr.PostingsForMatchers(false, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 54cfbc2c4..e290bf9f6 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -360,6 +360,9 @@ func TestReadIndexFormatV1(t *testing.T) { blockDir := filepath.Join("testdata", "index_format_v1") block, err := OpenBlock(nil, blockDir, nil) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, block.Close()) + }) q, err := NewBlockQuerier(block, 0, 1000) require.NoError(t, err) diff --git a/tsdb/head.go b/tsdb/head.go index 4f3422904..6b0815ccb 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -83,6 +83,7 @@ type Head struct { deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. postings *index.MemPostings // Postings lists for terms. + pfmc *PostingsForMatchersCache tombstones *tombstones.MemTombstones @@ -191,6 +192,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti }, stats: stats, reg: r, + + pfmc: NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize), } if err := h.resetInMemoryState(); err != nil { return nil, err @@ -1065,7 +1068,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { ir := h.indexRange(mint, maxt) - p, err := PostingsForMatchers(ir, ms...) + p, err := ir.PostingsForMatchers(false, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 99b87221a..4edb5c32f 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -110,6 +110,10 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting return index.Merge(res...), nil } +func (h *headIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + return h.head.pfmc.PostingsForMatchers(h, concurrent, ms...) +} + func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { series := make([]*memSeries, 0, 128) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 330e8c403..aeec21559 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1400,7 +1400,7 @@ func TestWalRepair_DecodingError(t *testing.T) { err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. _, corrErr := err.(*wal.CorruptionErr) require.True(t, corrErr, "reading the wal didn't return corruption error") - require.NoError(t, w.Close()) + require.NoError(t, h.Close()) } // Open the db to trigger a repair. diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 6c493be1d..c9aebead0 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -796,3 +796,28 @@ func (it *bigEndianPostings) Seek(x uint64) bool { func (it *bigEndianPostings) Err() error { return nil } + +// PostingsCloner takes an existing Postings and allows independently clone them. +type PostingsCloner struct { + ids []uint64 + err error +} + +// NewPostingsCloner takes an existing Postings and allows independently clone them. +// The instance provided shouldn't have been used before (no Next() calls should have been done) +// and it shouldn't be used once provided to the PostingsCloner. +func NewPostingsCloner(p Postings) *PostingsCloner { + var ids []uint64 + for p.Next() { + ids = append(ids, p.At()) + } + return &PostingsCloner{ids: ids, err: p.Err()} +} + +// Clone returns another independent Postings instance. +func (c *PostingsCloner) Clone() Postings { + if c.err != nil { + return ErrPostings(c.err) + } + return newListPostings(c.ids...) +} diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index bbf5332a5..c2503d4dd 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -877,3 +877,126 @@ func TestMemPostings_Delete(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded) } + +func TestPostingsCloner(t *testing.T) { + for _, tc := range []struct { + name string + check func(testing.TB, *PostingsCloner) + }{ + { + name: "seek beyond highest value of postings, then other clone seeks higher", + check: func(t testing.TB, pc *PostingsCloner) { + p1 := pc.Clone() + require.False(t, p1.Seek(9)) + require.Equal(t, uint64(0), p1.At()) + + p2 := pc.Clone() + require.False(t, p2.Seek(10)) + require.Equal(t, uint64(0), p2.At()) + }, + }, + { + name: "seek beyond highest value of postings, then other clone seeks lower", + check: func(t testing.TB, pc *PostingsCloner) { + p1 := pc.Clone() + require.False(t, p1.Seek(9)) + require.Equal(t, uint64(0), p1.At()) + + p2 := pc.Clone() + require.True(t, p2.Seek(2)) + require.Equal(t, uint64(2), p2.At()) + }, + }, + { + name: "seek to posting with value 3 or higher", + check: func(t testing.TB, pc *PostingsCloner) { + p := pc.Clone() + require.True(t, p.Seek(3)) + require.Equal(t, uint64(4), p.At()) + require.True(t, p.Seek(4)) + require.Equal(t, uint64(4), p.At()) + }, + }, + { + name: "seek alternatively on different postings", + check: func(t testing.TB, pc *PostingsCloner) { + p1 := pc.Clone() + require.True(t, p1.Seek(1)) + require.Equal(t, uint64(1), p1.At()) + + p2 := pc.Clone() + require.True(t, p2.Seek(2)) + require.Equal(t, uint64(2), p2.At()) + + p3 := pc.Clone() + require.True(t, p3.Seek(4)) + require.Equal(t, uint64(4), p3.At()) + + p4 := pc.Clone() + require.True(t, p4.Seek(5)) + require.Equal(t, uint64(8), p4.At()) + + require.True(t, p1.Seek(3)) + require.Equal(t, uint64(4), p1.At()) + require.True(t, p1.Seek(4)) + require.Equal(t, uint64(4), p1.At()) + }, + }, + { + name: "iterate through the postings", + check: func(t testing.TB, pc *PostingsCloner) { + p1 := pc.Clone() + p2 := pc.Clone() + + // both one step + require.True(t, p1.Next()) + require.Equal(t, uint64(1), p1.At()) + require.True(t, p2.Next()) + require.Equal(t, uint64(1), p2.At()) + + require.True(t, p1.Next()) + require.Equal(t, uint64(2), p1.At()) + require.True(t, p1.Next()) + require.Equal(t, uint64(4), p1.At()) + require.True(t, p1.Next()) + require.Equal(t, uint64(8), p1.At()) + require.False(t, p1.Next()) + + require.True(t, p2.Next()) + require.Equal(t, uint64(2), p2.At()) + require.True(t, p2.Next()) + require.Equal(t, uint64(4), p2.At()) + }, + }, + { + name: "at before call of next shouldn't panic", + check: func(t testing.TB, pc *PostingsCloner) { + p := pc.Clone() + require.Equal(t, uint64(0), p.At()) + }, + }, + { + name: "ensure a failed seek doesn't allow more next calls", + check: func(t testing.TB, pc *PostingsCloner) { + p := pc.Clone() + require.False(t, p.Seek(9)) + require.Equal(t, uint64(0), p.At()) + require.False(t, p.Next()) + require.Equal(t, uint64(0), p.At()) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + pc := NewPostingsCloner(newListPostings(1, 2, 4, 8)) + tc.check(t, pc) + }) + } + + t.Run("cloning an err postings", func(t *testing.T) { + expectedErr := fmt.Errorf("foobar") + pc := NewPostingsCloner(ErrPostings(expectedErr)) + p := pc.Clone() + require.False(t, p.Next()) + require.Equal(t, expectedErr, p.Err()) + }) +} diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go new file mode 100644 index 000000000..061bec871 --- /dev/null +++ b/tsdb/postings_for_matchers_cache.go @@ -0,0 +1,201 @@ +package tsdb + +import ( + "container/list" + "strings" + "sync" + "time" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/index" +) + +const ( + defaultPostingsForMatchersCacheTTL = 10 * time.Second + defaultPostingsForMatchersCacheSize = 100 +) + +// IndexPostingsReader is a subset of IndexReader methods, the minimum required to evaluate PostingsForMatchers +type IndexPostingsReader interface { + // LabelValues returns possible label values which may not be sorted. + LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) + + // Postings returns the postings list iterator for the label pairs. + // The Postings here contain the offsets to the series inside the index. + // Found IDs are not strictly required to point to a valid Series, e.g. + // during background garbage collections. Input values must be sorted. + Postings(name string, values ...string) (index.Postings, error) +} + +// NewPostingsForMatchersCache creates a new PostingsForMatchersCache. +func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForMatchersCache { + b := &PostingsForMatchersCache{ + calls: &sync.Map{}, + cached: list.New(), + + ttl: ttl, + cacheSize: cacheSize, + + timeNow: time.Now, + postingsForMatchers: PostingsForMatchers, + } + + return b +} + +// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in. +type PostingsForMatchersCache struct { + calls *sync.Map + + cachedMtx sync.RWMutex + cached *list.List + + ttl time.Duration + cacheSize int + + // timeNow is the time.Now that can be replaced for testing purposes + timeNow func() time.Time + // postingsForMatchers can be replaced for testing purposes + postingsForMatchers func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) +} + +func (c *PostingsForMatchersCache) PostingsForMatchers(ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + if !concurrent { + return c.postingsForMatchers(ix, ms...) + } + c.expire() + return c.postingsForMatchersPromise(ix, ms)() +} + +func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) { + var ( + wg sync.WaitGroup + cloner *index.PostingsCloner + outerErr error + ) + wg.Add(1) + + promise := func() (index.Postings, error) { + wg.Wait() + if outerErr != nil { + return nil, outerErr + } + return cloner.Clone(), nil + } + + key := matchersKey(ms) + oldPromise, loaded := c.calls.LoadOrStore(key, promise) + if loaded { + return oldPromise.(func() (index.Postings, error)) + } + defer wg.Done() + + if postings, err := c.postingsForMatchers(ix, ms...); err != nil { + outerErr = err + } else { + cloner = index.NewPostingsCloner(postings) + } + + c.created(key, c.timeNow()) + return promise +} + +type postingsForMatchersCachedCall struct { + key string + ts time.Time +} + +func (c *PostingsForMatchersCache) expire() { + if c.ttl <= 0 { + return + } + + c.cachedMtx.RLock() + if !c.shouldEvictHead() { + c.cachedMtx.RUnlock() + return + } + c.cachedMtx.RUnlock() + + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + + for c.shouldEvictHead() { + c.evictHead() + } +} + +// shouldEvictHead returns true if cache head should be evicted, either because it's too old, +// or because the cache has too many elements +// should be called while read lock is held on cachedMtx +func (c *PostingsForMatchersCache) shouldEvictHead() bool { + if c.cached.Len() > c.cacheSize { + return true + } + h := c.cached.Front() + if h == nil { + return false + } + ts := h.Value.(*postingsForMatchersCachedCall).ts + return c.timeNow().Sub(ts) >= c.ttl +} + +func (c *PostingsForMatchersCache) evictHead() { + front := c.cached.Front() + oldest := front.Value.(*postingsForMatchersCachedCall) + c.calls.Delete(oldest.key) + c.cached.Remove(front) +} + +// created has to be called when returning from the PostingsForMatchers call that creates the promise. +// the ts provided should be the call time. +func (c *PostingsForMatchersCache) created(key string, ts time.Time) { + if c.ttl <= 0 { + c.calls.Delete(key) + return + } + + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + + c.cached.PushBack(&postingsForMatchersCachedCall{ + key: key, + ts: ts, + }) +} + +// matchersKey provides a unique string key for the given matchers slice +// NOTE: different orders of matchers will produce different keys, +// but it's unlikely that we'll receive same matchers in different orders at the same time +func matchersKey(ms []*labels.Matcher) string { + const ( + typeLen = 2 + sepLen = 1 + ) + var size int + for _, m := range ms { + size += len(m.Name) + len(m.Value) + typeLen + sepLen + } + sb := strings.Builder{} + sb.Grow(size) + for _, m := range ms { + sb.WriteString(m.Name) + sb.WriteString(m.Type.String()) + sb.WriteString(m.Value) + sb.WriteByte(0) + } + key := sb.String() + return key +} + +// indexReaderWithPostingsForMatchers adapts an index.Reader to be an IndexReader by adding the PostingsForMatchers method +type indexReaderWithPostingsForMatchers struct { + *index.Reader + pfmc *PostingsForMatchersCache +} + +func (ir indexReaderWithPostingsForMatchers) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + return ir.pfmc.PostingsForMatchers(ir, concurrent, ms...) +} + +var _ IndexReader = indexReaderWithPostingsForMatchers{} diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go new file mode 100644 index 000000000..c0e35e42b --- /dev/null +++ b/tsdb/postings_for_matchers_cache_test.go @@ -0,0 +1,308 @@ +package tsdb + +import ( + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/index" +) + +func TestPostingsForMatchersCache(t *testing.T) { + const testCacheSize = 5 + // newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func + newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock) *PostingsForMatchersCache { + c := NewPostingsForMatchersCache(ttl, testCacheSize) + if c.postingsForMatchers == nil { + t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func") + } + c.postingsForMatchers = pfm + c.timeNow = timeMock.timeNow + return c + } + + t.Run("happy case one call", func(t *testing.T) { + for _, concurrent := range []bool{true, false} { + t.Run(fmt.Sprintf("concurrent=%t", concurrent), func(t *testing.T) { + expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} + expectedPostingsErr := fmt.Errorf("failed successfully") + + c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix) + require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms) + return index.ErrPostings(expectedPostingsErr), nil + }, &timeNowMock{}) + + p, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, expectedMatchers...) + require.NoError(t, err) + require.NotNil(t, p) + require.Equal(t, p.Err(), expectedPostingsErr, "Expected ErrPostings with err %q, got %T with err %q", expectedPostingsErr, p, p.Err()) + }) + } + }) + + t.Run("err returned", func(t *testing.T) { + expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} + expectedErr := fmt.Errorf("failed successfully") + + c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + return nil, expectedErr + }, &timeNowMock{}) + + _, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) + require.Equal(t, expectedErr, err) + }) + + t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) { + for _, cacheEnabled := range []bool{true, false} { + t.Run(fmt.Sprintf("cacheEnabled=%t", cacheEnabled), func(t *testing.T) { + calls := [][]*labels.Matcher{ + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same + {labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type + {labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name + {labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 + {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same + } + + // we'll identify results by each call's error, and the error will be the string value of the first matcher + matchersString := func(ms []*labels.Matcher) string { + s := strings.Builder{} + for i, m := range ms { + if i > 0 { + s.WriteByte(',') + } + s.WriteString(m.String()) + } + return s.String() + } + expectedResults := make([]string, len(calls)) + for i, c := range calls { + expectedResults[i] = c[0].String() + } + + expectedPostingsForMatchersCalls := 5 + // we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic + called := make(chan struct{}, expectedPostingsForMatchersCalls) + release := make(chan struct{}) + var ttl time.Duration + if cacheEnabled { + ttl = defaultPostingsForMatchersCacheTTL + } + c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + select { + case called <- struct{}{}: + default: + } + <-release + return nil, fmt.Errorf(matchersString(ms)) + }, &timeNowMock{}) + + results := make([]string, len(calls)) + resultsWg := sync.WaitGroup{} + resultsWg.Add(len(calls)) + + // perform all calls + for i := 0; i < len(calls); i++ { + go func(i int) { + _, err := c.PostingsForMatchers(indexForPostingsMock{}, true, calls[i]...) + results[i] = err.Error() + resultsWg.Done() + }(i) + } + + // wait until all calls arrive to the mocked function + for i := 0; i < expectedPostingsForMatchersCalls; i++ { + <-called + } + + // let them all return + close(release) + + // wait for the results + resultsWg.Wait() + + // check that we got correct results + for i, c := range calls { + require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i]) + } + }) + } + }) + + t.Run("with concurrent==false, result is not cached", func(t *testing.T) { + expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} + + var call int + c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + call++ + return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil + }, &timeNowMock{}) + + // first call, fills the cache + p, err := c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + // second call within the ttl (we didn't advance the time), should call again because concurrent==false + p, err = c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 2") + }) + + t.Run("with cache disabled, result is not cached", func(t *testing.T) { + expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} + + var call int + c := newPostingsForMatchersCache(0, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + call++ + return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil + }, &timeNowMock{}) + + // first call, fills the cache + p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + // second call within the ttl (we didn't advance the time), should call again because concurrent==false + p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 2") + }) + + t.Run("cached value is returned, then it expires", func(t *testing.T) { + timeNow := &timeNowMock{} + expectedMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + } + + var call int + c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + call++ + return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil + }, timeNow) + + // first call, fills the cache + p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + timeNow.advance(defaultPostingsForMatchersCacheTTL / 2) + + // second call within the ttl, should use the cache + p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + timeNow.advance(defaultPostingsForMatchersCacheTTL / 2) + + // third call is after ttl (exactly), should call again + p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 2") + }) + + t.Run("cached value is evicted because cache exceeds max size", func(t *testing.T) { + timeNow := &timeNowMock{} + calls := make([][]*labels.Matcher, testCacheSize) + for i := range calls { + calls[i] = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "matchers", fmt.Sprintf("%d", i))} + } + + callsPerMatchers := map[string]int{} + c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + k := matchersKey(ms) + callsPerMatchers[k]++ + return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil + }, timeNow) + + // each one of the first testCacheSize calls is cached properly + for _, matchers := range calls { + // first call + p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, matchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + // cached value + p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, matchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + } + + // one extra call is made, which is cached properly, but evicts the first cached value + someExtraMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} + // first call + p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, someExtraMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + // cached value + p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, someExtraMatchers...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 1") + + // make first call agian, it's calculated again + p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, calls[0]...) + require.NoError(t, err) + require.EqualError(t, p.Err(), "result from call 2") + }) +} + +type indexForPostingsMock struct{} + +func (idx indexForPostingsMock) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + panic("implement me") +} + +func (idx indexForPostingsMock) Postings(name string, values ...string) (index.Postings, error) { + panic("implement me") +} + +// timeNowMock offers a mockable time.Now() implementation +// empty value is ready to be used, and it should not be copied (use a reference) +type timeNowMock struct { + sync.Mutex + now time.Time +} + +// timeNow can be used as a mocked replacement for time.Now() +func (t *timeNowMock) timeNow() time.Time { + t.Lock() + defer t.Unlock() + if t.now.IsZero() { + t.now = time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC) + } + return t.now +} + +// advance advances the mocked time.Now() value +func (t *timeNowMock) advance(d time.Duration) { + t.Lock() + defer t.Unlock() + if t.now.IsZero() { + t.now = time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC) + } + t.now = t.now.Add(d) +} + +func BenchmarkMatchersKey(b *testing.B) { + const totalMatchers = 10 + const matcherSets = 100 + sets := make([][]*labels.Matcher, matcherSets) + for i := 0; i < matcherSets; i++ { + for j := 0; j < totalMatchers; j++ { + sets[i] = append(sets[i], labels.MustNewMatcher(labels.MatchType(j%4), fmt.Sprintf("%d_%d", i*13, j*65537), fmt.Sprintf("%x_%x", i*127, j*2_147_483_647))) + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = matchersKey(sets[i%matcherSets]) + } +} diff --git a/tsdb/querier.go b/tsdb/querier.go index 1693ac24b..209e4a8df 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -107,11 +107,12 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { mint := q.mint maxt := q.maxt - p, err := PostingsForMatchers(q.index, ms...) + sharded := hints != nil && hints.ShardCount > 0 + p, err := q.index.PostingsForMatchers(sharded, ms...) if err != nil { return storage.ErrSeriesSet(err) } - if hints != nil && hints.ShardCount > 0 { + if sharded { p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { @@ -151,11 +152,12 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, mint = hints.Start maxt = hints.End } - p, err := PostingsForMatchers(q.index, ms...) + sharded := hints != nil && hints.ShardCount > 0 + p, err := q.index.PostingsForMatchers(sharded, ms...) if err != nil { return storage.ErrChunkSeriesSet(err) } - if hints != nil && hints.ShardCount > 0 { + if sharded { p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { @@ -166,7 +168,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, // PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. The resulting postings are not ordered by series. -func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) { +func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { var its, notIts []index.Postings // See which label must be non-empty. // Optimization for case like {l=~".", l!="1"}. @@ -248,7 +250,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, return it, nil } -func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) { +func postingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) { // This method will not return postings for missing labels. // Fast-path for equal matching. @@ -293,7 +295,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro } // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. -func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) { +func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) { vals, err := ix.LabelValues(m.Name) if err != nil { return nil, err @@ -325,7 +327,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat } var p index.Postings - p, err = PostingsForMatchers(r, append(matchers, requireLabel)...) + p, err = r.PostingsForMatchers(false, append(matchers, requireLabel)...) if err != nil { return nil, err } @@ -356,7 +358,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat } func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]string, error) { - p, err := PostingsForMatchers(r, matchers...) + p, err := r.PostingsForMatchers(false, matchers...) if err != nil { return nil, err } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 224835578..2c3223f5e 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1216,6 +1216,27 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +func (m mockIndex) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + var ps []uint64 + for p, s := range m.series { + if matches(ms, s.l) { + ps = append(ps, p) + } + } + sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] }) + return index.NewListPostings(ps), nil +} + +func matches(ms []*labels.Matcher, lbls labels.Labels) bool { + lm := lbls.Map() + for _, m := range ms { + if !m.Matches(lm[m.Name]) { + return false + } + } + return true +} + func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { out := make([]uint64, 0, 128) @@ -2004,6 +2025,10 @@ func (m mockMatcherIndex) Postings(name string, values ...string) (index.Posting return index.EmptyPostings(), nil } +func (m mockMatcherIndex) PostingsForMatchers(bool, ...*labels.Matcher) (index.Postings, error) { + return index.EmptyPostings(), nil +} + func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings { return index.EmptyPostings() }