Implement PostingsCloner and implement promise/future pattern for PostingsForMatchers (#14)

* Fix tests not closing head/block properly

Close Head properly in TestWalRepair_DecodingError
Close Block properly in  TestReadIndexFormatV1

Co-authored-by: Christian Simon <simon@swine.de>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Implement PostingsCloner

PostingsCloner allows obtaining independend clones of a Postings, that
can be used for subsequent calls.

Co-authored-by: Christian Simon <simon@swine.de>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* IndexReader.PostingsForMatchers() and implement Cache

IndexReader now offers the PostingsForMatchers functionality, which is
provided by PostingsForMatchersCache.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
Co-authored-by: Peter Štibraný <peter.stibrany@grafana.com>

Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
Co-authored-by: Peter Štibraný <peter.stibrany@grafana.com>
This commit is contained in:
Christian Simon 2021-10-07 14:16:09 +01:00 committed by GitHub
parent 9173cade01
commit 50c1060328
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 719 additions and 13 deletions

View file

@ -75,6 +75,12 @@ type IndexReader interface {
// during background garbage collections. Input values must be sorted.
Postings(name string, values ...string) (index.Postings, error)
// PostingsForMatchers assembles a single postings iterator based on the given matchers.
// The resulting postings are not ordered by series.
// If concurrent hint is set to true, call will be optimized for a (most likely) concurrent call with same matchers,
// avoiding same calculations twice, however this implementation may lead to a worse performance when called once.
PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error)
// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
@ -309,10 +315,12 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache
}
closers = append(closers, cr)
ir, err := index.NewFileReaderWithCache(filepath.Join(dir, indexFilename), cache)
indexReader, err := index.NewFileReaderWithCache(filepath.Join(dir, indexFilename), cache)
if err != nil {
return nil, err
}
pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize)
ir := indexReaderWithPostingsForMatchers{indexReader, pfmc}
closers = append(closers, ir)
tr, sizeTomb, err := tombstones.ReadTombstones(dir)
@ -476,6 +484,10 @@ func (r blockIndexReader) Postings(name string, values ...string) (index.Posting
return p, nil
}
func (r blockIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
return r.ir.PostingsForMatchers(concurrent, ms...)
}
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}
@ -536,7 +548,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
return ErrClosing
}
p, err := PostingsForMatchers(pb.indexr, ms...)
p, err := pb.indexr.PostingsForMatchers(false, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}

View file

@ -360,6 +360,9 @@ func TestReadIndexFormatV1(t *testing.T) {
blockDir := filepath.Join("testdata", "index_format_v1")
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, block.Close())
})
q, err := NewBlockQuerier(block, 0, 1000)
require.NoError(t, err)

View file

@ -83,6 +83,7 @@ type Head struct {
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
postings *index.MemPostings // Postings lists for terms.
pfmc *PostingsForMatchersCache
tombstones *tombstones.MemTombstones
@ -191,6 +192,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
},
stats: stats,
reg: r,
pfmc: NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize),
}
if err := h.resetInMemoryState(); err != nil {
return nil, err
@ -1065,7 +1068,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
ir := h.indexRange(mint, maxt)
p, err := PostingsForMatchers(ir, ms...)
p, err := ir.PostingsForMatchers(false, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}

View file

@ -110,6 +110,10 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting
return index.Merge(res...), nil
}
func (h *headIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
return h.head.pfmc.PostingsForMatchers(h, concurrent, ms...)
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

View file

@ -1400,7 +1400,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wal.CorruptionErr)
require.True(t, corrErr, "reading the wal didn't return corruption error")
require.NoError(t, w.Close())
require.NoError(t, h.Close())
}
// Open the db to trigger a repair.

View file

@ -796,3 +796,28 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
func (it *bigEndianPostings) Err() error {
return nil
}
// PostingsCloner takes an existing Postings and allows independently clone them.
type PostingsCloner struct {
ids []uint64
err error
}
// NewPostingsCloner takes an existing Postings and allows independently clone them.
// The instance provided shouldn't have been used before (no Next() calls should have been done)
// and it shouldn't be used once provided to the PostingsCloner.
func NewPostingsCloner(p Postings) *PostingsCloner {
var ids []uint64
for p.Next() {
ids = append(ids, p.At())
}
return &PostingsCloner{ids: ids, err: p.Err()}
}
// Clone returns another independent Postings instance.
func (c *PostingsCloner) Clone() Postings {
if c.err != nil {
return ErrPostings(c.err)
}
return newListPostings(c.ids...)
}

View file

@ -877,3 +877,126 @@ func TestMemPostings_Delete(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded)
}
func TestPostingsCloner(t *testing.T) {
for _, tc := range []struct {
name string
check func(testing.TB, *PostingsCloner)
}{
{
name: "seek beyond highest value of postings, then other clone seeks higher",
check: func(t testing.TB, pc *PostingsCloner) {
p1 := pc.Clone()
require.False(t, p1.Seek(9))
require.Equal(t, uint64(0), p1.At())
p2 := pc.Clone()
require.False(t, p2.Seek(10))
require.Equal(t, uint64(0), p2.At())
},
},
{
name: "seek beyond highest value of postings, then other clone seeks lower",
check: func(t testing.TB, pc *PostingsCloner) {
p1 := pc.Clone()
require.False(t, p1.Seek(9))
require.Equal(t, uint64(0), p1.At())
p2 := pc.Clone()
require.True(t, p2.Seek(2))
require.Equal(t, uint64(2), p2.At())
},
},
{
name: "seek to posting with value 3 or higher",
check: func(t testing.TB, pc *PostingsCloner) {
p := pc.Clone()
require.True(t, p.Seek(3))
require.Equal(t, uint64(4), p.At())
require.True(t, p.Seek(4))
require.Equal(t, uint64(4), p.At())
},
},
{
name: "seek alternatively on different postings",
check: func(t testing.TB, pc *PostingsCloner) {
p1 := pc.Clone()
require.True(t, p1.Seek(1))
require.Equal(t, uint64(1), p1.At())
p2 := pc.Clone()
require.True(t, p2.Seek(2))
require.Equal(t, uint64(2), p2.At())
p3 := pc.Clone()
require.True(t, p3.Seek(4))
require.Equal(t, uint64(4), p3.At())
p4 := pc.Clone()
require.True(t, p4.Seek(5))
require.Equal(t, uint64(8), p4.At())
require.True(t, p1.Seek(3))
require.Equal(t, uint64(4), p1.At())
require.True(t, p1.Seek(4))
require.Equal(t, uint64(4), p1.At())
},
},
{
name: "iterate through the postings",
check: func(t testing.TB, pc *PostingsCloner) {
p1 := pc.Clone()
p2 := pc.Clone()
// both one step
require.True(t, p1.Next())
require.Equal(t, uint64(1), p1.At())
require.True(t, p2.Next())
require.Equal(t, uint64(1), p2.At())
require.True(t, p1.Next())
require.Equal(t, uint64(2), p1.At())
require.True(t, p1.Next())
require.Equal(t, uint64(4), p1.At())
require.True(t, p1.Next())
require.Equal(t, uint64(8), p1.At())
require.False(t, p1.Next())
require.True(t, p2.Next())
require.Equal(t, uint64(2), p2.At())
require.True(t, p2.Next())
require.Equal(t, uint64(4), p2.At())
},
},
{
name: "at before call of next shouldn't panic",
check: func(t testing.TB, pc *PostingsCloner) {
p := pc.Clone()
require.Equal(t, uint64(0), p.At())
},
},
{
name: "ensure a failed seek doesn't allow more next calls",
check: func(t testing.TB, pc *PostingsCloner) {
p := pc.Clone()
require.False(t, p.Seek(9))
require.Equal(t, uint64(0), p.At())
require.False(t, p.Next())
require.Equal(t, uint64(0), p.At())
},
},
} {
t.Run(tc.name, func(t *testing.T) {
pc := NewPostingsCloner(newListPostings(1, 2, 4, 8))
tc.check(t, pc)
})
}
t.Run("cloning an err postings", func(t *testing.T) {
expectedErr := fmt.Errorf("foobar")
pc := NewPostingsCloner(ErrPostings(expectedErr))
p := pc.Clone()
require.False(t, p.Next())
require.Equal(t, expectedErr, p.Err())
})
}

View file

@ -0,0 +1,201 @@
package tsdb
import (
"container/list"
"strings"
"sync"
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/index"
)
const (
defaultPostingsForMatchersCacheTTL = 10 * time.Second
defaultPostingsForMatchersCacheSize = 100
)
// IndexPostingsReader is a subset of IndexReader methods, the minimum required to evaluate PostingsForMatchers
type IndexPostingsReader interface {
// LabelValues returns possible label values which may not be sorted.
LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)
// Postings returns the postings list iterator for the label pairs.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections. Input values must be sorted.
Postings(name string, values ...string) (index.Postings, error)
}
// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForMatchersCache {
b := &PostingsForMatchersCache{
calls: &sync.Map{},
cached: list.New(),
ttl: ttl,
cacheSize: cacheSize,
timeNow: time.Now,
postingsForMatchers: PostingsForMatchers,
}
return b
}
// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in.
type PostingsForMatchersCache struct {
calls *sync.Map
cachedMtx sync.RWMutex
cached *list.List
ttl time.Duration
cacheSize int
// timeNow is the time.Now that can be replaced for testing purposes
timeNow func() time.Time
// postingsForMatchers can be replaced for testing purposes
postingsForMatchers func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)
}
func (c *PostingsForMatchersCache) PostingsForMatchers(ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
if !concurrent {
return c.postingsForMatchers(ix, ms...)
}
c.expire()
return c.postingsForMatchersPromise(ix, ms)()
}
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) {
var (
wg sync.WaitGroup
cloner *index.PostingsCloner
outerErr error
)
wg.Add(1)
promise := func() (index.Postings, error) {
wg.Wait()
if outerErr != nil {
return nil, outerErr
}
return cloner.Clone(), nil
}
key := matchersKey(ms)
oldPromise, loaded := c.calls.LoadOrStore(key, promise)
if loaded {
return oldPromise.(func() (index.Postings, error))
}
defer wg.Done()
if postings, err := c.postingsForMatchers(ix, ms...); err != nil {
outerErr = err
} else {
cloner = index.NewPostingsCloner(postings)
}
c.created(key, c.timeNow())
return promise
}
type postingsForMatchersCachedCall struct {
key string
ts time.Time
}
func (c *PostingsForMatchersCache) expire() {
if c.ttl <= 0 {
return
}
c.cachedMtx.RLock()
if !c.shouldEvictHead() {
c.cachedMtx.RUnlock()
return
}
c.cachedMtx.RUnlock()
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
for c.shouldEvictHead() {
c.evictHead()
}
}
// shouldEvictHead returns true if cache head should be evicted, either because it's too old,
// or because the cache has too many elements
// should be called while read lock is held on cachedMtx
func (c *PostingsForMatchersCache) shouldEvictHead() bool {
if c.cached.Len() > c.cacheSize {
return true
}
h := c.cached.Front()
if h == nil {
return false
}
ts := h.Value.(*postingsForMatchersCachedCall).ts
return c.timeNow().Sub(ts) >= c.ttl
}
func (c *PostingsForMatchersCache) evictHead() {
front := c.cached.Front()
oldest := front.Value.(*postingsForMatchersCachedCall)
c.calls.Delete(oldest.key)
c.cached.Remove(front)
}
// created has to be called when returning from the PostingsForMatchers call that creates the promise.
// the ts provided should be the call time.
func (c *PostingsForMatchersCache) created(key string, ts time.Time) {
if c.ttl <= 0 {
c.calls.Delete(key)
return
}
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached.PushBack(&postingsForMatchersCachedCall{
key: key,
ts: ts,
})
}
// matchersKey provides a unique string key for the given matchers slice
// NOTE: different orders of matchers will produce different keys,
// but it's unlikely that we'll receive same matchers in different orders at the same time
func matchersKey(ms []*labels.Matcher) string {
const (
typeLen = 2
sepLen = 1
)
var size int
for _, m := range ms {
size += len(m.Name) + len(m.Value) + typeLen + sepLen
}
sb := strings.Builder{}
sb.Grow(size)
for _, m := range ms {
sb.WriteString(m.Name)
sb.WriteString(m.Type.String())
sb.WriteString(m.Value)
sb.WriteByte(0)
}
key := sb.String()
return key
}
// indexReaderWithPostingsForMatchers adapts an index.Reader to be an IndexReader by adding the PostingsForMatchers method
type indexReaderWithPostingsForMatchers struct {
*index.Reader
pfmc *PostingsForMatchersCache
}
func (ir indexReaderWithPostingsForMatchers) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
return ir.pfmc.PostingsForMatchers(ir, concurrent, ms...)
}
var _ IndexReader = indexReaderWithPostingsForMatchers{}

View file

@ -0,0 +1,308 @@
package tsdb
import (
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/index"
)
func TestPostingsForMatchersCache(t *testing.T) {
const testCacheSize = 5
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, testCacheSize)
if c.postingsForMatchers == nil {
t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func")
}
c.postingsForMatchers = pfm
c.timeNow = timeMock.timeNow
return c
}
t.Run("happy case one call", func(t *testing.T) {
for _, concurrent := range []bool{true, false} {
t.Run(fmt.Sprintf("concurrent=%t", concurrent), func(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
expectedPostingsErr := fmt.Errorf("failed successfully")
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix)
require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms)
return index.ErrPostings(expectedPostingsErr), nil
}, &timeNowMock{})
p, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, expectedMatchers...)
require.NoError(t, err)
require.NotNil(t, p)
require.Equal(t, p.Err(), expectedPostingsErr, "Expected ErrPostings with err %q, got %T with err %q", expectedPostingsErr, p, p.Err())
})
}
})
t.Run("err returned", func(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
expectedErr := fmt.Errorf("failed successfully")
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
return nil, expectedErr
}, &timeNowMock{})
_, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.Equal(t, expectedErr, err)
})
t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) {
for _, cacheEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("cacheEnabled=%t", cacheEnabled), func(t *testing.T) {
calls := [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same
{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type
{labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name
{labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same
}
// we'll identify results by each call's error, and the error will be the string value of the first matcher
matchersString := func(ms []*labels.Matcher) string {
s := strings.Builder{}
for i, m := range ms {
if i > 0 {
s.WriteByte(',')
}
s.WriteString(m.String())
}
return s.String()
}
expectedResults := make([]string, len(calls))
for i, c := range calls {
expectedResults[i] = c[0].String()
}
expectedPostingsForMatchersCalls := 5
// we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic
called := make(chan struct{}, expectedPostingsForMatchersCalls)
release := make(chan struct{})
var ttl time.Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
select {
case called <- struct{}{}:
default:
}
<-release
return nil, fmt.Errorf(matchersString(ms))
}, &timeNowMock{})
results := make([]string, len(calls))
resultsWg := sync.WaitGroup{}
resultsWg.Add(len(calls))
// perform all calls
for i := 0; i < len(calls); i++ {
go func(i int) {
_, err := c.PostingsForMatchers(indexForPostingsMock{}, true, calls[i]...)
results[i] = err.Error()
resultsWg.Done()
}(i)
}
// wait until all calls arrive to the mocked function
for i := 0; i < expectedPostingsForMatchersCalls; i++ {
<-called
}
// let them all return
close(release)
// wait for the results
resultsWg.Wait()
// check that we got correct results
for i, c := range calls {
require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
}
})
}
})
t.Run("with concurrent==false, result is not cached", func(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
var call int
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{})
// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
// second call within the ttl (we didn't advance the time), should call again because concurrent==false
p, err = c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 2")
})
t.Run("with cache disabled, result is not cached", func(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
var call int
c := newPostingsForMatchersCache(0, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{})
// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
// second call within the ttl (we didn't advance the time), should call again because concurrent==false
p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 2")
})
t.Run("cached value is returned, then it expires", func(t *testing.T) {
timeNow := &timeNowMock{}
expectedMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}
var call int
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, timeNow)
// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
timeNow.advance(defaultPostingsForMatchersCacheTTL / 2)
// second call within the ttl, should use the cache
p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
timeNow.advance(defaultPostingsForMatchersCacheTTL / 2)
// third call is after ttl (exactly), should call again
p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 2")
})
t.Run("cached value is evicted because cache exceeds max size", func(t *testing.T) {
timeNow := &timeNowMock{}
calls := make([][]*labels.Matcher, testCacheSize)
for i := range calls {
calls[i] = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "matchers", fmt.Sprintf("%d", i))}
}
callsPerMatchers := map[string]int{}
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
k := matchersKey(ms)
callsPerMatchers[k]++
return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil
}, timeNow)
// each one of the first testCacheSize calls is cached properly
for _, matchers := range calls {
// first call
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, matchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
// cached value
p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, matchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
}
// one extra call is made, which is cached properly, but evicts the first cached value
someExtraMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
// first call
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, someExtraMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
// cached value
p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, someExtraMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
// make first call agian, it's calculated again
p, err = c.PostingsForMatchers(indexForPostingsMock{}, true, calls[0]...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 2")
})
}
type indexForPostingsMock struct{}
func (idx indexForPostingsMock) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
panic("implement me")
}
func (idx indexForPostingsMock) Postings(name string, values ...string) (index.Postings, error) {
panic("implement me")
}
// timeNowMock offers a mockable time.Now() implementation
// empty value is ready to be used, and it should not be copied (use a reference)
type timeNowMock struct {
sync.Mutex
now time.Time
}
// timeNow can be used as a mocked replacement for time.Now()
func (t *timeNowMock) timeNow() time.Time {
t.Lock()
defer t.Unlock()
if t.now.IsZero() {
t.now = time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC)
}
return t.now
}
// advance advances the mocked time.Now() value
func (t *timeNowMock) advance(d time.Duration) {
t.Lock()
defer t.Unlock()
if t.now.IsZero() {
t.now = time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC)
}
t.now = t.now.Add(d)
}
func BenchmarkMatchersKey(b *testing.B) {
const totalMatchers = 10
const matcherSets = 100
sets := make([][]*labels.Matcher, matcherSets)
for i := 0; i < matcherSets; i++ {
for j := 0; j < totalMatchers; j++ {
sets[i] = append(sets[i], labels.MustNewMatcher(labels.MatchType(j%4), fmt.Sprintf("%d_%d", i*13, j*65537), fmt.Sprintf("%x_%x", i*127, j*2_147_483_647)))
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = matchersKey(sets[i%matcherSets])
}
}

View file

@ -107,11 +107,12 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
mint := q.mint
maxt := q.maxt
p, err := PostingsForMatchers(q.index, ms...)
sharded := hints != nil && hints.ShardCount > 0
p, err := q.index.PostingsForMatchers(sharded, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
}
if hints != nil && hints.ShardCount > 0 {
if sharded {
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
@ -151,11 +152,12 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
mint = hints.Start
maxt = hints.End
}
p, err := PostingsForMatchers(q.index, ms...)
sharded := hints != nil && hints.ShardCount > 0
p, err := q.index.PostingsForMatchers(sharded, ms...)
if err != nil {
return storage.ErrChunkSeriesSet(err)
}
if hints != nil && hints.ShardCount > 0 {
if sharded {
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
@ -166,7 +168,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. The resulting postings are not ordered by series.
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
var its, notIts []index.Postings
// See which label must be non-empty.
// Optimization for case like {l=~".", l!="1"}.
@ -248,7 +250,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
return it, nil
}
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
func postingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) {
// This method will not return postings for missing labels.
// Fast-path for equal matching.
@ -293,7 +295,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
}
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) {
vals, err := ix.LabelValues(m.Name)
if err != nil {
return nil, err
@ -325,7 +327,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat
}
var p index.Postings
p, err = PostingsForMatchers(r, append(matchers, requireLabel)...)
p, err = r.PostingsForMatchers(false, append(matchers, requireLabel)...)
if err != nil {
return nil, err
}
@ -356,7 +358,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat
}
func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
p, err := PostingsForMatchers(r, matchers...)
p, err := r.PostingsForMatchers(false, matchers...)
if err != nil {
return nil, err
}

View file

@ -1216,6 +1216,27 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}
func (m mockIndex) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
var ps []uint64
for p, s := range m.series {
if matches(ms, s.l) {
ps = append(ps, p)
}
}
sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] })
return index.NewListPostings(ps), nil
}
func matches(ms []*labels.Matcher, lbls labels.Labels) bool {
lm := lbls.Map()
for _, m := range ms {
if !m.Matches(lm[m.Name]) {
return false
}
}
return true
}
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
out := make([]uint64, 0, 128)
@ -2004,6 +2025,10 @@ func (m mockMatcherIndex) Postings(name string, values ...string) (index.Posting
return index.EmptyPostings(), nil
}
func (m mockMatcherIndex) PostingsForMatchers(bool, ...*labels.Matcher) (index.Postings, error) {
return index.EmptyPostings(), nil
}
func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings {
return index.EmptyPostings()
}