2021-10-07 06:16:09 -07:00
package tsdb
import (
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
2021-11-18 06:46:46 -08:00
"github.com/prometheus/prometheus/model/labels"
2021-10-07 06:16:09 -07:00
"github.com/prometheus/prometheus/tsdb/index"
)
func TestPostingsForMatchersCache ( t * testing . T ) {
const testCacheSize = 5
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
newPostingsForMatchersCache := func ( ttl time . Duration , pfm func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) , timeMock * timeNowMock ) * PostingsForMatchersCache {
c := NewPostingsForMatchersCache ( ttl , testCacheSize )
if c . postingsForMatchers == nil {
t . Fatalf ( "NewPostingsForMatchersCache() didn't assign postingsForMatchers func" )
}
c . postingsForMatchers = pfm
c . timeNow = timeMock . timeNow
return c
}
t . Run ( "happy case one call" , func ( t * testing . T ) {
for _ , concurrent := range [ ] bool { true , false } {
t . Run ( fmt . Sprintf ( "concurrent=%t" , concurrent ) , func ( t * testing . T ) {
expectedMatchers := [ ] * labels . Matcher { labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) }
expectedPostingsErr := fmt . Errorf ( "failed successfully" )
c := newPostingsForMatchersCache ( defaultPostingsForMatchersCacheTTL , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
require . IsType ( t , indexForPostingsMock { } , ix , "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)" , ix , ix )
require . Equal ( t , expectedMatchers , ms , "Wrong label matchers provided, expected %v, got %v" , expectedMatchers , ms )
return index . ErrPostings ( expectedPostingsErr ) , nil
} , & timeNowMock { } )
p , err := c . PostingsForMatchers ( indexForPostingsMock { } , concurrent , expectedMatchers ... )
require . NoError ( t , err )
require . NotNil ( t , p )
require . Equal ( t , p . Err ( ) , expectedPostingsErr , "Expected ErrPostings with err %q, got %T with err %q" , expectedPostingsErr , p , p . Err ( ) )
} )
}
} )
t . Run ( "err returned" , func ( t * testing . T ) {
expectedMatchers := [ ] * labels . Matcher { labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) }
expectedErr := fmt . Errorf ( "failed successfully" )
c := newPostingsForMatchersCache ( defaultPostingsForMatchersCacheTTL , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
return nil , expectedErr
} , & timeNowMock { } )
_ , err := c . PostingsForMatchers ( indexForPostingsMock { } , true , expectedMatchers ... )
require . Equal ( t , expectedErr , err )
} )
t . Run ( "happy case multiple concurrent calls: two same one different" , func ( t * testing . T ) {
for _ , cacheEnabled := range [ ] bool { true , false } {
t . Run ( fmt . Sprintf ( "cacheEnabled=%t" , cacheEnabled ) , func ( t * testing . T ) {
calls := [ ] [ ] * labels . Matcher {
{ labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) } , // 1
{ labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) } , // 1 same
{ labels . MustNewMatcher ( labels . MatchRegexp , "foo" , "bar" ) } , // 2: different match type
{ labels . MustNewMatcher ( labels . MatchEqual , "diff" , "bar" ) } , // 3: different name
{ labels . MustNewMatcher ( labels . MatchEqual , "foo" , "diff" ) } , // 4: different value
{ labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) , labels . MustNewMatcher ( labels . MatchEqual , "boo" , "bam" ) } , // 5
{ labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) , labels . MustNewMatcher ( labels . MatchEqual , "boo" , "bam" ) } , // 5 same
}
// we'll identify results by each call's error, and the error will be the string value of the first matcher
matchersString := func ( ms [ ] * labels . Matcher ) string {
s := strings . Builder { }
for i , m := range ms {
if i > 0 {
s . WriteByte ( ',' )
}
s . WriteString ( m . String ( ) )
}
return s . String ( )
}
expectedResults := make ( [ ] string , len ( calls ) )
for i , c := range calls {
expectedResults [ i ] = c [ 0 ] . String ( )
}
expectedPostingsForMatchersCalls := 5
// we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic
called := make ( chan struct { } , expectedPostingsForMatchersCalls )
release := make ( chan struct { } )
var ttl time . Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache ( ttl , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
select {
case called <- struct { } { } :
default :
}
<- release
return nil , fmt . Errorf ( matchersString ( ms ) )
} , & timeNowMock { } )
results := make ( [ ] string , len ( calls ) )
resultsWg := sync . WaitGroup { }
resultsWg . Add ( len ( calls ) )
// perform all calls
for i := 0 ; i < len ( calls ) ; i ++ {
go func ( i int ) {
_ , err := c . PostingsForMatchers ( indexForPostingsMock { } , true , calls [ i ] ... )
results [ i ] = err . Error ( )
resultsWg . Done ( )
} ( i )
}
// wait until all calls arrive to the mocked function
for i := 0 ; i < expectedPostingsForMatchersCalls ; i ++ {
<- called
}
// let them all return
close ( release )
// wait for the results
resultsWg . Wait ( )
// check that we got correct results
for i , c := range calls {
require . Equal ( t , matchersString ( c ) , results [ i ] , "Call %d should have returned error %q, but got %q instead" , i , matchersString ( c ) , results [ i ] )
}
} )
}
} )
t . Run ( "with concurrent==false, result is not cached" , func ( t * testing . T ) {
expectedMatchers := [ ] * labels . Matcher { labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) }
var call int
c := newPostingsForMatchersCache ( defaultPostingsForMatchersCacheTTL , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
call ++
return index . ErrPostings ( fmt . Errorf ( "result from call %d" , call ) ) , nil
} , & timeNowMock { } )
// first call, fills the cache
p , err := c . PostingsForMatchers ( indexForPostingsMock { } , false , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
// second call within the ttl (we didn't advance the time), should call again because concurrent==false
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , false , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 2" )
} )
t . Run ( "with cache disabled, result is not cached" , func ( t * testing . T ) {
expectedMatchers := [ ] * labels . Matcher { labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) }
var call int
c := newPostingsForMatchersCache ( 0 , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
call ++
return index . ErrPostings ( fmt . Errorf ( "result from call %d" , call ) ) , nil
} , & timeNowMock { } )
// first call, fills the cache
p , err := c . PostingsForMatchers ( indexForPostingsMock { } , true , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
// second call within the ttl (we didn't advance the time), should call again because concurrent==false
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , true , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 2" )
} )
t . Run ( "cached value is returned, then it expires" , func ( t * testing . T ) {
timeNow := & timeNowMock { }
expectedMatchers := [ ] * labels . Matcher {
labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) ,
}
var call int
c := newPostingsForMatchersCache ( defaultPostingsForMatchersCacheTTL , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
call ++
return index . ErrPostings ( fmt . Errorf ( "result from call %d" , call ) ) , nil
} , timeNow )
// first call, fills the cache
p , err := c . PostingsForMatchers ( indexForPostingsMock { } , true , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
timeNow . advance ( defaultPostingsForMatchersCacheTTL / 2 )
// second call within the ttl, should use the cache
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , true , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
timeNow . advance ( defaultPostingsForMatchersCacheTTL / 2 )
// third call is after ttl (exactly), should call again
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , true , expectedMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 2" )
} )
t . Run ( "cached value is evicted because cache exceeds max size" , func ( t * testing . T ) {
timeNow := & timeNowMock { }
calls := make ( [ ] [ ] * labels . Matcher , testCacheSize )
for i := range calls {
calls [ i ] = [ ] * labels . Matcher { labels . MustNewMatcher ( labels . MatchEqual , "matchers" , fmt . Sprintf ( "%d" , i ) ) }
}
callsPerMatchers := map [ string ] int { }
c := newPostingsForMatchersCache ( defaultPostingsForMatchersCacheTTL , func ( ix IndexPostingsReader , ms ... * labels . Matcher ) ( index . Postings , error ) {
k := matchersKey ( ms )
callsPerMatchers [ k ] ++
return index . ErrPostings ( fmt . Errorf ( "result from call %d" , callsPerMatchers [ k ] ) ) , nil
} , timeNow )
// each one of the first testCacheSize calls is cached properly
for _ , matchers := range calls {
// first call
p , err := c . PostingsForMatchers ( indexForPostingsMock { } , true , matchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
// cached value
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , true , matchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
}
// one extra call is made, which is cached properly, but evicts the first cached value
someExtraMatchers := [ ] * labels . Matcher { labels . MustNewMatcher ( labels . MatchEqual , "foo" , "bar" ) }
// first call
p , err := c . PostingsForMatchers ( indexForPostingsMock { } , true , someExtraMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
// cached value
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , true , someExtraMatchers ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 1" )
// make first call agian, it's calculated again
p , err = c . PostingsForMatchers ( indexForPostingsMock { } , true , calls [ 0 ] ... )
require . NoError ( t , err )
require . EqualError ( t , p . Err ( ) , "result from call 2" )
} )
}
type indexForPostingsMock struct { }
func ( idx indexForPostingsMock ) LabelValues ( name string , matchers ... * labels . Matcher ) ( [ ] string , error ) {
panic ( "implement me" )
}
func ( idx indexForPostingsMock ) Postings ( name string , values ... string ) ( index . Postings , error ) {
panic ( "implement me" )
}
// timeNowMock offers a mockable time.Now() implementation
// empty value is ready to be used, and it should not be copied (use a reference)
type timeNowMock struct {
sync . Mutex
now time . Time
}
// timeNow can be used as a mocked replacement for time.Now()
func ( t * timeNowMock ) timeNow ( ) time . Time {
t . Lock ( )
defer t . Unlock ( )
if t . now . IsZero ( ) {
t . now = time . Date ( 2020 , 1 , 2 , 3 , 4 , 5 , 0 , time . UTC )
}
return t . now
}
// advance advances the mocked time.Now() value
func ( t * timeNowMock ) advance ( d time . Duration ) {
t . Lock ( )
defer t . Unlock ( )
if t . now . IsZero ( ) {
t . now = time . Date ( 2020 , 1 , 2 , 3 , 4 , 5 , 0 , time . UTC )
}
t . now = t . now . Add ( d )
}
func BenchmarkMatchersKey ( b * testing . B ) {
const totalMatchers = 10
const matcherSets = 100
sets := make ( [ ] [ ] * labels . Matcher , matcherSets )
for i := 0 ; i < matcherSets ; i ++ {
for j := 0 ; j < totalMatchers ; j ++ {
sets [ i ] = append ( sets [ i ] , labels . MustNewMatcher ( labels . MatchType ( j % 4 ) , fmt . Sprintf ( "%d_%d" , i * 13 , j * 65537 ) , fmt . Sprintf ( "%x_%x" , i * 127 , j * 2_147_483_647 ) ) )
}
}
b . ResetTimer ( )
for i := 0 ; i < b . N ; i ++ {
_ = matchersKey ( sets [ i % matcherSets ] )
}
}