Added series hash cache support to TSDB (#5)

* Added series hash cache support to TSDB

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed imports grouping

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2021-08-17 15:31:08 +02:00 committed by GitHub
parent a3ad212658
commit 481299f4a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 554 additions and 88 deletions

View file

@ -283,7 +283,7 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) {
if logger == nil {
logger = log.NewNopLogger()
}
@ -304,7 +304,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, cr)
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename))
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename), cache)
if err != nil {
return nil, err
}

View file

@ -64,14 +64,14 @@ func TestSetCompactionFailed(t *testing.T) {
}()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil)
b, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
require.Equal(t, false, b.meta.Compaction.Failed)
require.NoError(t, b.setCompactionFailed())
require.Equal(t, true, b.meta.Compaction.Failed)
require.NoError(t, b.Close())
b, err = OpenBlock(nil, blockDir, nil)
b, err = OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
require.Equal(t, true, b.meta.Compaction.Failed)
require.NoError(t, b.Close())
@ -83,7 +83,7 @@ func TestCreateBlock(t *testing.T) {
defer func() {
require.NoError(t, os.RemoveAll(tmpdir))
}()
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil, nil)
if err == nil {
require.NoError(t, b.Close())
}
@ -193,7 +193,7 @@ func TestCorruptedChunk(t *testing.T) {
require.NoError(t, f.Close())
// Check open err.
b, err := OpenBlock(nil, blockDir, nil)
b, err := OpenBlock(nil, blockDir, nil, nil)
if tc.openErr != nil {
require.Equal(t, tc.openErr.Error(), err.Error())
return
@ -235,7 +235,7 @@ func TestLabelValuesWithMatchers(t *testing.T) {
require.Greater(t, len(files), 0, "No chunk created.")
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, block.Close()) }()
@ -303,7 +303,7 @@ func TestBlockSize(t *testing.T) {
// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
blockInit, err = OpenBlock(nil, blockDirInit, nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, blockInit.Close())
@ -327,7 +327,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, blockAfterCompact.Close())
@ -358,7 +358,7 @@ func TestReadIndexFormatV1(t *testing.T) {
*/
blockDir := filepath.Join("testdata", "index_format_v1")
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
q, err := NewBlockQuerier(block, 0, 1000)
@ -398,7 +398,7 @@ func BenchmarkLabelValuesWithMatchers(b *testing.B) {
require.Greater(b, len(files), 0, "No chunk created.")
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(b, err)
defer func() { require.NoError(b, block.Close()) }()
@ -452,7 +452,7 @@ func TestLabelNamesWithMatchers(t *testing.T) {
require.Greater(t, len(files), 0, "No chunk created.")
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })

View file

@ -50,7 +50,7 @@ func TestBlockWriter(t *testing.T) {
// Confirm the block has the correct data.
blockpath := filepath.Join(outputDir, id.String())
b, err := OpenBlock(nil, blockpath, nil)
b, err := OpenBlock(nil, blockpath, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, b.Close()) }()
q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64)

View file

@ -418,7 +418,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if b == nil {
var err error
b, err = OpenBlock(c.logger, d, c.chunkPool)
b, err = OpenBlock(c.logger, d, c.chunkPool, nil)
if err != nil {
return uid, err
}

View file

@ -1056,7 +1056,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs := make([]string, 0, len(c.ranges))
var blocks []*Block
for _, r := range c.ranges {
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil)
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer func() {

View file

@ -43,6 +43,8 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/wal"
)
@ -154,6 +156,10 @@ type Options struct {
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
MaxExemplars int64
// SeriesHashCache specifies the series hash cache used when querying shards via Querier.Select().
// If nil, the cache won't be used.
SeriesHashCache *hashcache.SeriesHashCache
}
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -501,7 +507,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil)
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil)
if err != nil {
return nil, err
}
@ -1058,7 +1064,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock()
defer db.mtx.Unlock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache)
if err != nil {
return err
}
@ -1146,7 +1152,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil
}
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
@ -1163,7 +1169,12 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
// See if we already have the block in memory or open it otherwise.
block, open := getBlock(loaded, meta.ULID)
if !open {
block, err = OpenBlock(l, bDir, chunkPool)
var cacheProvider index.ReaderCacheProvider
if cache != nil {
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
}
block, err = OpenBlock(l, bDir, chunkPool, cacheProvider)
if err != nil {
corrupted[meta.ULID] = err
continue

View file

@ -1168,7 +1168,7 @@ func TestTombstoneCleanFail(t *testing.T) {
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1))
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
// Add some fake tombstones to trigger the compaction.
tomb := tombstones.NewMemTombstones()
@ -1218,7 +1218,7 @@ func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
// Generate some blocks with old mint (near epoch).
for j := 0; j < totalBlocks; j++ {
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
tomb := tombstones.NewMemTombstones()
@ -1277,7 +1277,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil, nil)
require.NoError(c.t, err)
require.NoError(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)

View file

@ -0,0 +1,190 @@
package hashcache
import (
"sync"
"go.uber.org/atomic"
)
const (
numGenerations = 4
// approxBytesPerEntry is the estimated memory footprint (in bytes) of 1 cache
// entry, measured with TestSeriesHashCache_MeasureApproximateSizePerEntry().
approxBytesPerEntry = 28
)
// SeriesHashCache is a bounded cache mapping the per-block series ID with
// its labels hash.
type SeriesHashCache struct {
maxEntriesPerGeneration uint64
generationsMx sync.RWMutex
generations [numGenerations]cacheGeneration
}
func NewSeriesHashCache(maxBytes uint64) *SeriesHashCache {
maxEntriesPerGeneration := maxBytes / approxBytesPerEntry / numGenerations
if maxEntriesPerGeneration < 1 {
maxEntriesPerGeneration = 1
}
c := &SeriesHashCache{maxEntriesPerGeneration: maxEntriesPerGeneration}
// Init generations.
for idx := 0; idx < numGenerations; idx++ {
c.generations[idx].blocks = &sync.Map{}
c.generations[idx].length = atomic.NewUint64(0)
}
return c
}
// GetBlockCache returns a reference to the series hash cache for the provided blockID.
// The returned cache reference should be retained only for a short period (ie. the duration
// of the execution of 1 single query).
func (c *SeriesHashCache) GetBlockCache(blockID string) *BlockSeriesHashCache {
blockCache := &BlockSeriesHashCache{}
c.generationsMx.RLock()
defer c.generationsMx.RUnlock()
// Trigger a garbage collection if the current generation reached the max size.
if c.generations[0].length.Load() >= c.maxEntriesPerGeneration {
c.generationsMx.RUnlock()
c.gc()
c.generationsMx.RLock()
}
for idx := 0; idx < numGenerations; idx++ {
gen := c.generations[idx]
if value, ok := gen.blocks.Load(blockID); ok {
blockCache.generations[idx] = value.(*blockCacheGeneration)
continue
}
// Create a new per-block cache only for the current generation.
// If the cache for the older generation doesn't exist, then its
// value will be null and skipped when reading.
if idx == 0 {
value, _ := gen.blocks.LoadOrStore(blockID, newBlockCacheGeneration(gen.length))
blockCache.generations[idx] = value.(*blockCacheGeneration)
}
}
return blockCache
}
// GetBlockCacheProvider returns a cache provider bounded to the provided blockID.
func (c *SeriesHashCache) GetBlockCacheProvider(blockID string) *BlockSeriesHashCacheProvider {
return NewBlockSeriesHashCacheProvider(c, blockID)
}
func (c *SeriesHashCache) gc() {
c.generationsMx.Lock()
defer c.generationsMx.Unlock()
// Make sure no other goroutines already GCed the current generation.
if c.generations[0].length.Load() < c.maxEntriesPerGeneration {
return
}
// Shift the current generation to old.
for idx := numGenerations - 2; idx >= 0; idx-- {
c.generations[idx+1] = c.generations[idx]
}
// Initialise a new empty current generation.
c.generations[0] = cacheGeneration{
blocks: &sync.Map{},
length: atomic.NewUint64(0),
}
}
// cacheGeneration holds a multi-blocks cache generation.
type cacheGeneration struct {
// blocks maps the block ID with blockCacheGeneration.
blocks *sync.Map
// Keeps track of the number of items added to the cache. This counter
// is passed to each blockCacheGeneration belonging to this generation.
length *atomic.Uint64
}
// blockCacheGeneration holds a per-block cache generation.
type blockCacheGeneration struct {
// hashes maps per-block series ID with its hash.
hashesMx sync.RWMutex
hashes map[uint64]uint64
// Keeps track of the number of items added to the cache. This counter is
// shared with all blockCacheGeneration in the "parent" cacheGeneration.
length *atomic.Uint64
}
func newBlockCacheGeneration(length *atomic.Uint64) *blockCacheGeneration {
return &blockCacheGeneration{
hashes: make(map[uint64]uint64),
length: length,
}
}
type BlockSeriesHashCache struct {
generations [numGenerations]*blockCacheGeneration
}
// Fetch the hash of the given seriesID from the cache and returns a boolean
// whether the series was found in the cache or not.
func (c *BlockSeriesHashCache) Fetch(seriesID uint64) (uint64, bool) {
// Look for it in all generations, starting from the most recent one (index 0).
for idx := 0; idx < numGenerations; idx++ {
gen := c.generations[idx]
// Skip if the cache doesn't exist for this generation.
if gen == nil {
continue
}
gen.hashesMx.RLock()
value, ok := gen.hashes[seriesID]
gen.hashesMx.RUnlock()
if ok {
return value, true
}
}
return 0, false
}
// Store the hash of the given seriesID in the cache.
func (c *BlockSeriesHashCache) Store(seriesID, hash uint64) {
// Store it in the most recent generation (index 0).
gen := c.generations[0]
gen.hashesMx.Lock()
gen.hashes[seriesID] = hash
gen.hashesMx.Unlock()
gen.length.Add(1)
}
type BlockSeriesHashCacheProvider struct {
cache *SeriesHashCache
blockID string
}
// NewBlockSeriesHashCacheProvider makes a new BlockSeriesHashCacheProvider.
func NewBlockSeriesHashCacheProvider(cache *SeriesHashCache, blockID string) *BlockSeriesHashCacheProvider {
return &BlockSeriesHashCacheProvider{
cache: cache,
blockID: blockID,
}
}
// SeriesHashCache returns a reference to the cache bounded to block provided
// to NewBlockSeriesHashCacheProvider().
func (p *BlockSeriesHashCacheProvider) SeriesHashCache() *BlockSeriesHashCache {
return p.cache.GetBlockCache(p.blockID)
}

View file

@ -0,0 +1,134 @@
package hashcache
import (
"crypto/rand"
"fmt"
"runtime"
"strconv"
"sync"
"testing"
"github.com/oklog/ulid"
"github.com/stretchr/testify/require"
)
func TestSeriesHashCache(t *testing.T) {
// Set the max cache size to store at most 1 entry per generation,
// so that we test the GC logic too.
c := NewSeriesHashCache(numGenerations * approxBytesPerEntry)
block1 := c.GetBlockCache("1")
assertFetch(t, block1, 1, 0, false)
block1.Store(1, 100)
assertFetch(t, block1, 1, 100, true)
block2 := c.GetBlockCache("2")
assertFetch(t, block2, 1, 0, false)
block2.Store(1, 1000)
assertFetch(t, block2, 1, 1000, true)
block3 := c.GetBlockCache("3")
assertFetch(t, block1, 1, 100, true)
assertFetch(t, block2, 1, 1000, true)
assertFetch(t, block3, 1, 0, false)
// Get again the block caches.
block1 = c.GetBlockCache("1")
block2 = c.GetBlockCache("2")
block3 = c.GetBlockCache("3")
assertFetch(t, block1, 1, 100, true)
assertFetch(t, block2, 1, 1000, true)
assertFetch(t, block3, 1, 0, false)
}
func TestSeriesHashCache_MeasureApproximateSizePerEntry(t *testing.T) {
// This test measures the approximate size (in bytes) per cache entry.
// We only take in account the memory used by the map, which is the largest amount.
const numEntries = 100000
c := NewSeriesHashCache(1024 * 1024 * 1024)
b := c.GetBlockCache(ulid.MustNew(0, rand.Reader).String())
before := runtime.MemStats{}
runtime.ReadMemStats(&before)
// Preallocate the map in order to not account for re-allocations
// since we want to measure the heap utilization and not allocations.
b.generations[0].hashes = make(map[uint64]uint64, numEntries)
for i := uint64(0); i < numEntries; i++ {
b.Store(i, i)
}
after := runtime.MemStats{}
runtime.ReadMemStats(&after)
t.Logf("approximate size per entry: %d bytes", (after.TotalAlloc-before.TotalAlloc)/numEntries)
}
func TestSeriesHashCache_Concurrency(t *testing.T) {
const (
concurrency = 100
numIterations = 10000
numBlocks = 10
)
// Set the max cache size to store at most 10 entries per generation,
// so that we stress test the GC too.
c := NewSeriesHashCache(10 * numGenerations * approxBytesPerEntry)
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for n := 0; n < numIterations; n++ {
blockID := strconv.Itoa(n % numBlocks)
blockCache := c.GetBlockCache(blockID)
blockCache.Store(uint64(n), uint64(n))
actual, ok := blockCache.Fetch(uint64(n))
require.True(t, ok)
require.Equal(t, uint64(n), actual)
}
}()
}
wg.Wait()
}
func BenchmarkSeriesHashCache_StoreAndFetch(b *testing.B) {
for _, numBlocks := range []int{1, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("blocks=%d", numBlocks), func(b *testing.B) {
c := NewSeriesHashCache(1024 * 1024)
// In this benchmark we assume the usage pattern is calling Fetch() and Store() will be
// orders of magnitude more frequent than GetBlockCache(), so we call GetBlockCache() just
// once per block.
blockCaches := make([]*BlockSeriesHashCache, numBlocks)
for idx := 0; idx < numBlocks; idx++ {
blockCaches[idx] = c.GetBlockCache(strconv.Itoa(idx))
}
// In this benchmark we assume the ratio between Store() and Fetch() is 1:10.
storeOps := (b.N / 10) + 1
for n := 0; n < b.N; n++ {
if n < storeOps {
blockCaches[n%numBlocks].Store(uint64(n), uint64(n))
} else {
blockCaches[n%numBlocks].Fetch(uint64(n % storeOps))
}
}
})
}
}
func assertFetch(t *testing.T, c *BlockSeriesHashCache, seriesID, expectedValue uint64, expectedOk bool) {
actualValue, actualOk := c.Fetch(seriesID)
require.Equal(t, expectedValue, actualValue)
require.Equal(t, expectedOk, actualOk)
}

View file

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/hashcache"
)
const (
@ -1042,6 +1043,10 @@ type StringIter interface {
Err() error
}
type ReaderCacheProvider interface {
SeriesHashCache() *hashcache.BlockSeriesHashCache
}
type Reader struct {
b ByteSlice
toc *TOC
@ -1062,6 +1067,9 @@ type Reader struct {
dec *Decoder
version int
// Provides a cache mapping series labels hash by series ID.
cacheProvider ReaderCacheProvider
}
type postingOffset struct {
@ -1091,17 +1099,17 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// NewReader returns a new index reader on the given byte slice. It automatically
// handles different format versions.
func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, ioutil.NopCloser(nil))
func NewReader(b ByteSlice, cacheProvider ReaderCacheProvider) (*Reader, error) {
return newReader(b, ioutil.NopCloser(nil), cacheProvider)
}
// NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) {
func NewFileReader(path string, cacheProvider ReaderCacheProvider) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path)
if err != nil {
return nil, err
}
r, err := newReader(realByteSlice(f.Bytes()), f)
r, err := newReader(realByteSlice(f.Bytes()), f, cacheProvider)
if err != nil {
return nil, tsdb_errors.NewMulti(
err,
@ -1112,11 +1120,12 @@ func NewFileReader(path string) (*Reader, error) {
return r, nil
}
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Reader, error) {
r := &Reader{
b: b,
c: c,
postings: map[string][]postingOffset{},
b: b,
c: c,
postings: map[string][]postingOffset{},
cacheProvider: cacheProvider,
}
// Verify header.
@ -1715,17 +1724,41 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post
bufLbls = make(labels.Labels, 0, 10)
)
// Request the cache each time because the cache implementation requires
// that the cache reference is retained for a short period.
var seriesHashCache *hashcache.BlockSeriesHashCache
if r.cacheProvider != nil {
seriesHashCache = r.cacheProvider.SeriesHashCache()
}
for p.Next() {
id := p.At()
// Get the series labels (no chunks).
err := r.Series(id, &bufLbls, nil)
if err != nil {
return ErrPostings(errors.Errorf("series %d not found", id))
var (
hash uint64
ok bool
)
// Check if the hash is cached.
if seriesHashCache != nil {
hash, ok = seriesHashCache.Fetch(id)
}
if !ok {
// Get the series labels (no chunks).
err := r.Series(id, &bufLbls, nil)
if err != nil {
return ErrPostings(errors.Errorf("series %d not found", id))
}
hash = bufLbls.Hash()
if seriesHashCache != nil {
seriesHashCache.Store(id, hash)
}
}
// Check if the series belong to the shard.
if bufLbls.Hash()%shardCount != shardIndex {
if hash%shardCount != shardIndex {
continue
}

View file

@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/prometheus/prometheus/util/testutil"
)
@ -149,7 +150,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
require.NoError(t, err)
require.NoError(t, iw.Close())
ir, err := NewFileReader(fn)
ir, err := NewFileReader(fn, nil)
require.NoError(t, err)
require.NoError(t, ir.Close())
@ -160,7 +161,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
require.NoError(t, err)
f.Close()
_, err = NewFileReader(dir)
_, err = NewFileReader(dir, nil)
require.Error(t, err)
}
@ -199,7 +200,7 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, iw.Close())
ir, err := NewFileReader(fn)
ir, err := NewFileReader(fn, nil)
require.NoError(t, err)
p, err := ir.Postings("a", "1")
@ -245,50 +246,61 @@ func TestIndexRW_Postings(t *testing.T) {
"b": {"1", "2", "3", "4"},
}, labelIndices)
{
// List all postings for a given label value. This is what we expect to get
// in output from all shards.
p, err = ir.Postings("a", "1")
require.NoError(t, err)
// Test ShardedPostings() with and without series hash cache.
for _, cacheEnabled := range []bool{false, true} {
t.Run(fmt.Sprintf("ShardedPostings() cache enabled: %v", cacheEnabled), func(t *testing.T) {
var cache ReaderCacheProvider
if cacheEnabled {
cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test")
}
var expected []uint64
for p.Next() {
expected = append(expected, p.At())
}
require.NoError(t, p.Err())
require.Greater(t, len(expected), 0)
ir, err := NewFileReader(fn, cache)
require.NoError(t, err)
// Query the same postings for each shard.
const shardCount = uint64(4)
actualShards := make(map[uint64][]uint64)
actualPostings := make([]uint64, 0, len(expected))
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
// List all postings for a given label value. This is what we expect to get
// in output from all shards.
p, err = ir.Postings("a", "1")
require.NoError(t, err)
p = ir.ShardedPostings(p, shardIndex, shardCount)
var expected []uint64
for p.Next() {
ref := p.At()
actualShards[shardIndex] = append(actualShards[shardIndex], ref)
actualPostings = append(actualPostings, ref)
expected = append(expected, p.At())
}
require.NoError(t, p.Err())
}
require.Greater(t, len(expected), 0)
// We expect the postings merged out of shards is the exact same of the non sharded ones.
require.ElementsMatch(t, expected, actualPostings)
// Query the same postings for each shard.
const shardCount = uint64(4)
actualShards := make(map[uint64][]uint64)
actualPostings := make([]uint64, 0, len(expected))
// We expect the series in each shard are the expected ones.
for shardIndex, ids := range actualShards {
for _, id := range ids {
var lbls labels.Labels
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
p, err = ir.Postings("a", "1")
require.NoError(t, err)
require.NoError(t, ir.Series(id, &lbls, nil))
require.Equal(t, shardIndex, lbls.Hash()%shardCount)
p = ir.ShardedPostings(p, shardIndex, shardCount)
for p.Next() {
ref := p.At()
actualShards[shardIndex] = append(actualShards[shardIndex], ref)
actualPostings = append(actualPostings, ref)
}
require.NoError(t, p.Err())
}
}
// We expect the postings merged out of shards is the exact same of the non sharded ones.
require.ElementsMatch(t, expected, actualPostings)
// We expect the series in each shard are the expected ones.
for shardIndex, ids := range actualShards {
for _, id := range ids {
var lbls labels.Labels
require.NoError(t, ir.Series(id, &lbls, nil))
require.Equal(t, shardIndex, lbls.Hash()%shardCount)
}
}
})
}
require.NoError(t, ir.Close())
@ -331,7 +343,7 @@ func TestPostingsMany(t *testing.T) {
}
require.NoError(t, iw.Close())
ir, err := NewFileReader(fn)
ir, err := NewFileReader(fn, nil)
require.NoError(t, err)
defer func() { require.NoError(t, ir.Close()) }()
@ -469,7 +481,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close()
require.NoError(t, err)
ir, err := NewFileReader(filepath.Join(dir, indexFilename))
ir, err := NewFileReader(filepath.Join(dir, indexFilename), nil)
require.NoError(t, err)
for p := range mi.postings {
@ -541,7 +553,7 @@ func TestDecbufUvarintWithInvalidBuffer(t *testing.T) {
func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
_, err := NewReader(b)
_, err := NewReader(b, nil)
require.Error(t, err)
}
@ -553,7 +565,7 @@ func TestNewFileReaderErrorNoOpenFiles(t *testing.T) {
err := ioutil.WriteFile(idxName, []byte("corrupted contents"), 0666)
require.NoError(t, err)
_, err = NewFileReader(idxName)
_, err = NewFileReader(idxName, nil)
require.Error(t, err)
// dir.Close will fail on Win if idxName fd is not closed on error path.
@ -606,3 +618,59 @@ func TestSymbols(t *testing.T) {
}
require.NoError(t, iter.Err())
}
func BenchmarkReader_ShardedPostings(b *testing.B) {
const (
numSeries = 10000
numShards = 16
)
dir, err := ioutil.TempDir("", "benchmark_reader_sharded_postings")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(dir))
}()
// Generate an index.
fn := filepath.Join(dir, indexFilename)
iw, err := NewWriter(context.Background(), fn)
require.NoError(b, err)
for i := 1; i <= numSeries; i++ {
require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i)))
}
require.NoError(b, iw.AddSymbol("const"))
require.NoError(b, iw.AddSymbol("unique"))
for i := 1; i <= numSeries; i++ {
require.NoError(b, iw.AddSeries(uint64(i), labels.Labels{
{Name: "const", Value: fmt.Sprintf("%10d", 1)},
{Name: "unique", Value: fmt.Sprintf("%10d", i)},
}))
}
require.NoError(b, iw.Close())
for _, cacheEnabled := range []bool{true, false} {
b.Run(fmt.Sprintf("cached enabled: %v", cacheEnabled), func(b *testing.B) {
var cache ReaderCacheProvider
if cacheEnabled {
cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test")
}
// Create a reader to read back all postings from the index.
ir, err := NewFileReader(fn, cache)
require.NoError(b, err)
b.ResetTimer()
for n := 0; n < b.N; n++ {
allPostings, err := ir.Postings("const", fmt.Sprintf("%10d", 1))
require.NoError(b, err)
ir.ShardedPostings(allPostings, uint64(n%numShards), numShards)
}
})
}
}

View file

@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/hashcache"
)
// Make entries ~50B in size, to emulate real-world high cardinality.
@ -76,7 +78,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
}()
blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlock(nil, blockdir, nil)
block, err := OpenBlock(nil, blockdir, nil, nil)
require.NoError(b, err)
defer func() {
require.NoError(b, block.Close())
@ -162,16 +164,28 @@ func BenchmarkQuerierSelect(b *testing.B) {
}
require.NoError(b, app.Commit())
bench := func(b *testing.B, br BlockReader, sorted bool) {
bench := func(b *testing.B, br BlockReader, sorted bool, sharding bool) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
for s := 1; s <= numSeries; s *= 10 {
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
q, err := NewBlockQuerier(br, 0, int64(s-1))
mint := int64(0)
maxt := int64(s - 1)
q, err := NewBlockQuerier(br, mint, maxt)
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
ss := q.Select(sorted, nil, matcher)
var hints *storage.SelectHints
if sharding {
hints = &storage.SelectHints{
Start: mint,
End: maxt,
ShardIndex: uint64(i % 16),
ShardCount: 16,
}
}
ss := q.Select(sorted, hints, matcher)
for ss.Next() {
}
require.NoError(b, ss.Err())
@ -182,10 +196,20 @@ func BenchmarkQuerierSelect(b *testing.B) {
}
b.Run("Head", func(b *testing.B) {
bench(b, h, false)
b.Run("without sharding", func(b *testing.B) {
bench(b, h, false, false)
})
b.Run("with sharding", func(b *testing.B) {
bench(b, h, false, true)
})
})
b.Run("SortedHead", func(b *testing.B) {
bench(b, h, true)
b.Run("without sharding", func(b *testing.B) {
bench(b, h, true, false)
})
b.Run("with sharding", func(b *testing.B) {
bench(b, h, true, true)
})
})
tmpdir, err := ioutil.TempDir("", "test_benchquerierselect")
@ -194,14 +218,20 @@ func BenchmarkQuerierSelect(b *testing.B) {
require.NoError(b, os.RemoveAll(tmpdir))
}()
seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024)
blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlock(nil, blockdir, nil)
block, err := OpenBlock(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"))
require.NoError(b, err)
defer func() {
require.NoError(b, block.Close())
}()
b.Run("Block", func(b *testing.B) {
bench(b, block, false)
b.Run("without sharding", func(b *testing.B) {
bench(b, block, false, false)
})
b.Run("with sharding", func(b *testing.B) {
bench(b, block, false, true)
})
})
}

View file

@ -1323,7 +1323,7 @@ func BenchmarkQueryIterator(b *testing.B) {
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil)
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer block.Close()
@ -1390,7 +1390,7 @@ func BenchmarkQuerySeek(b *testing.B) {
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil)
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer block.Close()
@ -1529,7 +1529,7 @@ func BenchmarkSetMatcher(b *testing.B) {
} else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt)
}
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil)
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer block.Close()
@ -1984,7 +1984,7 @@ func BenchmarkQueries(b *testing.B) {
qs := make([]storage.Querier, 0, 10)
for x := 0; x <= 10; x++ {
block, err := OpenBlock(nil, createBlock(b, dir, series), nil)
block, err := OpenBlock(nil, createBlock(b, dir, series), nil, nil)
require.NoError(b, err)
q, err := NewBlockQuerier(block, 1, int64(nSamples))
require.NoError(b, err)

View file

@ -81,7 +81,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
require.NoError(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0777))
// Read current index to check integrity.
r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), nil)
require.NoError(t, err)
p, err := r.Postings("b", "1")
require.NoError(t, err)
@ -99,7 +99,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
require.NoError(t, err)
db.Close()
r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), nil)
require.NoError(t, err)
defer r.Close()
p, err = r.Postings("b", "1")