diff --git a/tsdb/block.go b/tsdb/block.go index 05fc7cbf40..78396caa60 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -283,7 +283,7 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -304,7 +304,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er } closers = append(closers, cr) - ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename), cache) if err != nil { return nil, err } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 54cfbc2c48..677004ebc6 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -64,14 +64,14 @@ func TestSetCompactionFailed(t *testing.T) { }() blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) - b, err := OpenBlock(nil, blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) require.Equal(t, false, b.meta.Compaction.Failed) require.NoError(t, b.setCompactionFailed()) require.Equal(t, true, b.meta.Compaction.Failed) require.NoError(t, b.Close()) - b, err = OpenBlock(nil, blockDir, nil) + b, err = OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) require.Equal(t, true, b.meta.Compaction.Failed) require.NoError(t, b.Close()) @@ -83,7 +83,7 @@ func TestCreateBlock(t *testing.T) { defer func() { require.NoError(t, os.RemoveAll(tmpdir)) }() - b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil) + b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil, nil) if err == nil { require.NoError(t, b.Close()) } @@ -193,7 +193,7 @@ func TestCorruptedChunk(t *testing.T) { require.NoError(t, f.Close()) // Check open err. - b, err := OpenBlock(nil, blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil, nil) if tc.openErr != nil { require.Equal(t, tc.openErr.Error(), err.Error()) return @@ -235,7 +235,7 @@ func TestLabelValuesWithMatchers(t *testing.T) { require.Greater(t, len(files), 0, "No chunk created.") // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, block.Close()) }() @@ -303,7 +303,7 @@ func TestBlockSize(t *testing.T) { // Create a block and compare the reported size vs actual disk size. { blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100)) - blockInit, err = OpenBlock(nil, blockDirInit, nil) + blockInit, err = OpenBlock(nil, blockDirInit, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, blockInit.Close()) @@ -327,7 +327,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) - blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil) + blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil, nil) require.NoError(t, err) defer func() { require.NoError(t, blockAfterCompact.Close()) @@ -358,7 +358,7 @@ func TestReadIndexFormatV1(t *testing.T) { */ blockDir := filepath.Join("testdata", "index_format_v1") - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) q, err := NewBlockQuerier(block, 0, 1000) @@ -398,7 +398,7 @@ func BenchmarkLabelValuesWithMatchers(b *testing.B) { require.Greater(b, len(files), 0, "No chunk created.") // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(b, err) defer func() { require.NoError(b, block.Close()) }() @@ -452,7 +452,7 @@ func TestLabelNamesWithMatchers(t *testing.T) { require.Greater(t, len(files), 0, "No chunk created.") // Check open err. - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, block.Close()) }) diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index 07b500d7ae..410f5730d8 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -50,7 +50,7 @@ func TestBlockWriter(t *testing.T) { // Confirm the block has the correct data. blockpath := filepath.Join(outputDir, id.String()) - b, err := OpenBlock(nil, blockpath, nil) + b, err := OpenBlock(nil, blockpath, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, b.Close()) }() q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64) diff --git a/tsdb/compact.go b/tsdb/compact.go index b2ae7e4ea5..656a67eeec 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -418,7 +418,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(c.logger, d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool, nil) if err != nil { return uid, err } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e30f2b190f..0528ec8bb6 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1056,7 +1056,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs := make([]string, 0, len(c.ranges)) var blocks []*Block for _, r := range c.ranges { - block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil) + block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer func() { diff --git a/tsdb/db.go b/tsdb/db.go index 9d17b406b9..337192e6f9 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -43,6 +43,8 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met. + "github.com/prometheus/prometheus/tsdb/hashcache" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/wal" ) @@ -154,6 +156,10 @@ type Options struct { // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. MaxExemplars int64 + + // SeriesHashCache specifies the series hash cache used when querying shards via Querier.Select(). + // If nil, the cache won't be used. + SeriesHashCache *hashcache.SeriesHashCache } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -501,7 +507,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil) if err != nil { return nil, err } @@ -1058,7 +1064,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache) if err != nil { return err } @@ -1146,7 +1152,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") @@ -1163,7 +1169,12 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po // See if we already have the block in memory or open it otherwise. block, open := getBlock(loaded, meta.ULID) if !open { - block, err = OpenBlock(l, bDir, chunkPool) + var cacheProvider index.ReaderCacheProvider + if cache != nil { + cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) + } + + block, err = OpenBlock(l, bDir, chunkPool, cacheProvider) if err != nil { corrupted[meta.ULID] = err continue diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 876a13920c..f071de998d 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1168,7 +1168,7 @@ func TestTombstoneCleanFail(t *testing.T) { totalBlocks := 2 for i := 0; i < totalBlocks; i++ { blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1)) - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) // Add some fake tombstones to trigger the compaction. tomb := tombstones.NewMemTombstones() @@ -1218,7 +1218,7 @@ func TestTombstoneCleanRetentionLimitsRace(t *testing.T) { // Generate some blocks with old mint (near epoch). for j := 0; j < totalBlocks; j++ { blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) - block, err := OpenBlock(nil, blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil, nil) require.NoError(t, err) // Cover block with tombstones so it can be deleted with CleanTombstones() as well. tomb := tombstones.NewMemTombstones() @@ -1277,7 +1277,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil, nil) require.NoError(c.t, err) require.NoError(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) diff --git a/tsdb/hashcache/series_hash_cache.go b/tsdb/hashcache/series_hash_cache.go new file mode 100644 index 0000000000..51236623d5 --- /dev/null +++ b/tsdb/hashcache/series_hash_cache.go @@ -0,0 +1,190 @@ +package hashcache + +import ( + "sync" + + "go.uber.org/atomic" +) + +const ( + numGenerations = 4 + + // approxBytesPerEntry is the estimated memory footprint (in bytes) of 1 cache + // entry, measured with TestSeriesHashCache_MeasureApproximateSizePerEntry(). + approxBytesPerEntry = 28 +) + +// SeriesHashCache is a bounded cache mapping the per-block series ID with +// its labels hash. +type SeriesHashCache struct { + maxEntriesPerGeneration uint64 + + generationsMx sync.RWMutex + generations [numGenerations]cacheGeneration +} + +func NewSeriesHashCache(maxBytes uint64) *SeriesHashCache { + maxEntriesPerGeneration := maxBytes / approxBytesPerEntry / numGenerations + if maxEntriesPerGeneration < 1 { + maxEntriesPerGeneration = 1 + } + + c := &SeriesHashCache{maxEntriesPerGeneration: maxEntriesPerGeneration} + + // Init generations. + for idx := 0; idx < numGenerations; idx++ { + c.generations[idx].blocks = &sync.Map{} + c.generations[idx].length = atomic.NewUint64(0) + } + + return c +} + +// GetBlockCache returns a reference to the series hash cache for the provided blockID. +// The returned cache reference should be retained only for a short period (ie. the duration +// of the execution of 1 single query). +func (c *SeriesHashCache) GetBlockCache(blockID string) *BlockSeriesHashCache { + blockCache := &BlockSeriesHashCache{} + + c.generationsMx.RLock() + defer c.generationsMx.RUnlock() + + // Trigger a garbage collection if the current generation reached the max size. + if c.generations[0].length.Load() >= c.maxEntriesPerGeneration { + c.generationsMx.RUnlock() + c.gc() + c.generationsMx.RLock() + } + + for idx := 0; idx < numGenerations; idx++ { + gen := c.generations[idx] + + if value, ok := gen.blocks.Load(blockID); ok { + blockCache.generations[idx] = value.(*blockCacheGeneration) + continue + } + + // Create a new per-block cache only for the current generation. + // If the cache for the older generation doesn't exist, then its + // value will be null and skipped when reading. + if idx == 0 { + value, _ := gen.blocks.LoadOrStore(blockID, newBlockCacheGeneration(gen.length)) + blockCache.generations[idx] = value.(*blockCacheGeneration) + } + } + + return blockCache +} + +// GetBlockCacheProvider returns a cache provider bounded to the provided blockID. +func (c *SeriesHashCache) GetBlockCacheProvider(blockID string) *BlockSeriesHashCacheProvider { + return NewBlockSeriesHashCacheProvider(c, blockID) +} + +func (c *SeriesHashCache) gc() { + c.generationsMx.Lock() + defer c.generationsMx.Unlock() + + // Make sure no other goroutines already GCed the current generation. + if c.generations[0].length.Load() < c.maxEntriesPerGeneration { + return + } + + // Shift the current generation to old. + for idx := numGenerations - 2; idx >= 0; idx-- { + c.generations[idx+1] = c.generations[idx] + } + + // Initialise a new empty current generation. + c.generations[0] = cacheGeneration{ + blocks: &sync.Map{}, + length: atomic.NewUint64(0), + } +} + +// cacheGeneration holds a multi-blocks cache generation. +type cacheGeneration struct { + // blocks maps the block ID with blockCacheGeneration. + blocks *sync.Map + + // Keeps track of the number of items added to the cache. This counter + // is passed to each blockCacheGeneration belonging to this generation. + length *atomic.Uint64 +} + +// blockCacheGeneration holds a per-block cache generation. +type blockCacheGeneration struct { + // hashes maps per-block series ID with its hash. + hashesMx sync.RWMutex + hashes map[uint64]uint64 + + // Keeps track of the number of items added to the cache. This counter is + // shared with all blockCacheGeneration in the "parent" cacheGeneration. + length *atomic.Uint64 +} + +func newBlockCacheGeneration(length *atomic.Uint64) *blockCacheGeneration { + return &blockCacheGeneration{ + hashes: make(map[uint64]uint64), + length: length, + } +} + +type BlockSeriesHashCache struct { + generations [numGenerations]*blockCacheGeneration +} + +// Fetch the hash of the given seriesID from the cache and returns a boolean +// whether the series was found in the cache or not. +func (c *BlockSeriesHashCache) Fetch(seriesID uint64) (uint64, bool) { + // Look for it in all generations, starting from the most recent one (index 0). + for idx := 0; idx < numGenerations; idx++ { + gen := c.generations[idx] + + // Skip if the cache doesn't exist for this generation. + if gen == nil { + continue + } + + gen.hashesMx.RLock() + value, ok := gen.hashes[seriesID] + gen.hashesMx.RUnlock() + + if ok { + return value, true + } + } + + return 0, false +} + +// Store the hash of the given seriesID in the cache. +func (c *BlockSeriesHashCache) Store(seriesID, hash uint64) { + // Store it in the most recent generation (index 0). + gen := c.generations[0] + + gen.hashesMx.Lock() + gen.hashes[seriesID] = hash + gen.hashesMx.Unlock() + + gen.length.Add(1) +} + +type BlockSeriesHashCacheProvider struct { + cache *SeriesHashCache + blockID string +} + +// NewBlockSeriesHashCacheProvider makes a new BlockSeriesHashCacheProvider. +func NewBlockSeriesHashCacheProvider(cache *SeriesHashCache, blockID string) *BlockSeriesHashCacheProvider { + return &BlockSeriesHashCacheProvider{ + cache: cache, + blockID: blockID, + } +} + +// SeriesHashCache returns a reference to the cache bounded to block provided +// to NewBlockSeriesHashCacheProvider(). +func (p *BlockSeriesHashCacheProvider) SeriesHashCache() *BlockSeriesHashCache { + return p.cache.GetBlockCache(p.blockID) +} diff --git a/tsdb/hashcache/series_hash_cache_test.go b/tsdb/hashcache/series_hash_cache_test.go new file mode 100644 index 0000000000..e04c353878 --- /dev/null +++ b/tsdb/hashcache/series_hash_cache_test.go @@ -0,0 +1,134 @@ +package hashcache + +import ( + "crypto/rand" + "fmt" + "runtime" + "strconv" + "sync" + "testing" + + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" +) + +func TestSeriesHashCache(t *testing.T) { + // Set the max cache size to store at most 1 entry per generation, + // so that we test the GC logic too. + c := NewSeriesHashCache(numGenerations * approxBytesPerEntry) + + block1 := c.GetBlockCache("1") + assertFetch(t, block1, 1, 0, false) + block1.Store(1, 100) + assertFetch(t, block1, 1, 100, true) + + block2 := c.GetBlockCache("2") + assertFetch(t, block2, 1, 0, false) + block2.Store(1, 1000) + assertFetch(t, block2, 1, 1000, true) + + block3 := c.GetBlockCache("3") + assertFetch(t, block1, 1, 100, true) + assertFetch(t, block2, 1, 1000, true) + assertFetch(t, block3, 1, 0, false) + + // Get again the block caches. + block1 = c.GetBlockCache("1") + block2 = c.GetBlockCache("2") + block3 = c.GetBlockCache("3") + + assertFetch(t, block1, 1, 100, true) + assertFetch(t, block2, 1, 1000, true) + assertFetch(t, block3, 1, 0, false) +} + +func TestSeriesHashCache_MeasureApproximateSizePerEntry(t *testing.T) { + // This test measures the approximate size (in bytes) per cache entry. + // We only take in account the memory used by the map, which is the largest amount. + const numEntries = 100000 + c := NewSeriesHashCache(1024 * 1024 * 1024) + b := c.GetBlockCache(ulid.MustNew(0, rand.Reader).String()) + + before := runtime.MemStats{} + runtime.ReadMemStats(&before) + + // Preallocate the map in order to not account for re-allocations + // since we want to measure the heap utilization and not allocations. + b.generations[0].hashes = make(map[uint64]uint64, numEntries) + + for i := uint64(0); i < numEntries; i++ { + b.Store(i, i) + } + + after := runtime.MemStats{} + runtime.ReadMemStats(&after) + + t.Logf("approximate size per entry: %d bytes", (after.TotalAlloc-before.TotalAlloc)/numEntries) +} + +func TestSeriesHashCache_Concurrency(t *testing.T) { + const ( + concurrency = 100 + numIterations = 10000 + numBlocks = 10 + ) + + // Set the max cache size to store at most 10 entries per generation, + // so that we stress test the GC too. + c := NewSeriesHashCache(10 * numGenerations * approxBytesPerEntry) + + wg := sync.WaitGroup{} + wg.Add(concurrency) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + + for n := 0; n < numIterations; n++ { + blockID := strconv.Itoa(n % numBlocks) + + blockCache := c.GetBlockCache(blockID) + blockCache.Store(uint64(n), uint64(n)) + actual, ok := blockCache.Fetch(uint64(n)) + + require.True(t, ok) + require.Equal(t, uint64(n), actual) + } + }() + } + + wg.Wait() +} + +func BenchmarkSeriesHashCache_StoreAndFetch(b *testing.B) { + for _, numBlocks := range []int{1, 10, 100, 1000, 10000} { + b.Run(fmt.Sprintf("blocks=%d", numBlocks), func(b *testing.B) { + c := NewSeriesHashCache(1024 * 1024) + + // In this benchmark we assume the usage pattern is calling Fetch() and Store() will be + // orders of magnitude more frequent than GetBlockCache(), so we call GetBlockCache() just + // once per block. + blockCaches := make([]*BlockSeriesHashCache, numBlocks) + for idx := 0; idx < numBlocks; idx++ { + blockCaches[idx] = c.GetBlockCache(strconv.Itoa(idx)) + } + + // In this benchmark we assume the ratio between Store() and Fetch() is 1:10. + storeOps := (b.N / 10) + 1 + + for n := 0; n < b.N; n++ { + if n < storeOps { + blockCaches[n%numBlocks].Store(uint64(n), uint64(n)) + } else { + blockCaches[n%numBlocks].Fetch(uint64(n % storeOps)) + } + } + }) + } +} + +func assertFetch(t *testing.T, c *BlockSeriesHashCache, seriesID, expectedValue uint64, expectedOk bool) { + actualValue, actualOk := c.Fetch(seriesID) + require.Equal(t, expectedValue, actualValue) + require.Equal(t, expectedOk, actualOk) +} diff --git a/tsdb/index/index.go b/tsdb/index/index.go index b46c107e9c..d45a4220ec 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/hashcache" ) const ( @@ -1042,6 +1043,10 @@ type StringIter interface { Err() error } +type ReaderCacheProvider interface { + SeriesHashCache() *hashcache.BlockSeriesHashCache +} + type Reader struct { b ByteSlice toc *TOC @@ -1062,6 +1067,9 @@ type Reader struct { dec *Decoder version int + + // Provides a cache mapping series labels hash by series ID. + cacheProvider ReaderCacheProvider } type postingOffset struct { @@ -1091,17 +1099,17 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. -func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, ioutil.NopCloser(nil)) +func NewReader(b ByteSlice, cacheProvider ReaderCacheProvider) (*Reader, error) { + return newReader(b, ioutil.NopCloser(nil), cacheProvider) } // NewFileReader returns a new index reader against the given index file. -func NewFileReader(path string) (*Reader, error) { +func NewFileReader(path string, cacheProvider ReaderCacheProvider) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err } - r, err := newReader(realByteSlice(f.Bytes()), f) + r, err := newReader(realByteSlice(f.Bytes()), f, cacheProvider) if err != nil { return nil, tsdb_errors.NewMulti( err, @@ -1112,11 +1120,12 @@ func NewFileReader(path string) (*Reader, error) { return r, nil } -func newReader(b ByteSlice, c io.Closer) (*Reader, error) { +func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Reader, error) { r := &Reader{ - b: b, - c: c, - postings: map[string][]postingOffset{}, + b: b, + c: c, + postings: map[string][]postingOffset{}, + cacheProvider: cacheProvider, } // Verify header. @@ -1715,17 +1724,41 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post bufLbls = make(labels.Labels, 0, 10) ) + // Request the cache each time because the cache implementation requires + // that the cache reference is retained for a short period. + var seriesHashCache *hashcache.BlockSeriesHashCache + if r.cacheProvider != nil { + seriesHashCache = r.cacheProvider.SeriesHashCache() + } + for p.Next() { id := p.At() - // Get the series labels (no chunks). - err := r.Series(id, &bufLbls, nil) - if err != nil { - return ErrPostings(errors.Errorf("series %d not found", id)) + var ( + hash uint64 + ok bool + ) + + // Check if the hash is cached. + if seriesHashCache != nil { + hash, ok = seriesHashCache.Fetch(id) + } + + if !ok { + // Get the series labels (no chunks). + err := r.Series(id, &bufLbls, nil) + if err != nil { + return ErrPostings(errors.Errorf("series %d not found", id)) + } + + hash = bufLbls.Hash() + if seriesHashCache != nil { + seriesHashCache.Store(id, hash) + } } // Check if the series belong to the shard. - if bufLbls.Hash()%shardCount != shardIndex { + if hash%shardCount != shardIndex { continue } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index ee7a404b77..9bf2e30330 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/prometheus/prometheus/tsdb/hashcache" "github.com/prometheus/prometheus/util/testutil" ) @@ -149,7 +150,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err) require.NoError(t, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, nil) require.NoError(t, err) require.NoError(t, ir.Close()) @@ -160,7 +161,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err) f.Close() - _, err = NewFileReader(dir) + _, err = NewFileReader(dir, nil) require.Error(t, err) } @@ -199,7 +200,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, nil) require.NoError(t, err) p, err := ir.Postings("a", "1") @@ -245,50 +246,61 @@ func TestIndexRW_Postings(t *testing.T) { "b": {"1", "2", "3", "4"}, }, labelIndices) - { - // List all postings for a given label value. This is what we expect to get - // in output from all shards. - p, err = ir.Postings("a", "1") - require.NoError(t, err) + // Test ShardedPostings() with and without series hash cache. + for _, cacheEnabled := range []bool{false, true} { + t.Run(fmt.Sprintf("ShardedPostings() cache enabled: %v", cacheEnabled), func(t *testing.T) { + var cache ReaderCacheProvider + if cacheEnabled { + cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test") + } - var expected []uint64 - for p.Next() { - expected = append(expected, p.At()) - } - require.NoError(t, p.Err()) - require.Greater(t, len(expected), 0) + ir, err := NewFileReader(fn, cache) + require.NoError(t, err) - // Query the same postings for each shard. - const shardCount = uint64(4) - actualShards := make(map[uint64][]uint64) - actualPostings := make([]uint64, 0, len(expected)) - - for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { + // List all postings for a given label value. This is what we expect to get + // in output from all shards. p, err = ir.Postings("a", "1") require.NoError(t, err) - p = ir.ShardedPostings(p, shardIndex, shardCount) + var expected []uint64 for p.Next() { - ref := p.At() - - actualShards[shardIndex] = append(actualShards[shardIndex], ref) - actualPostings = append(actualPostings, ref) + expected = append(expected, p.At()) } require.NoError(t, p.Err()) - } + require.Greater(t, len(expected), 0) - // We expect the postings merged out of shards is the exact same of the non sharded ones. - require.ElementsMatch(t, expected, actualPostings) + // Query the same postings for each shard. + const shardCount = uint64(4) + actualShards := make(map[uint64][]uint64) + actualPostings := make([]uint64, 0, len(expected)) - // We expect the series in each shard are the expected ones. - for shardIndex, ids := range actualShards { - for _, id := range ids { - var lbls labels.Labels + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { + p, err = ir.Postings("a", "1") + require.NoError(t, err) - require.NoError(t, ir.Series(id, &lbls, nil)) - require.Equal(t, shardIndex, lbls.Hash()%shardCount) + p = ir.ShardedPostings(p, shardIndex, shardCount) + for p.Next() { + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) + } + require.NoError(t, p.Err()) } - } + + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.Labels + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, lbls.Hash()%shardCount) + } + } + }) } require.NoError(t, ir.Close()) @@ -331,7 +343,7 @@ func TestPostingsMany(t *testing.T) { } require.NoError(t, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, nil) require.NoError(t, err) defer func() { require.NoError(t, ir.Close()) }() @@ -469,7 +481,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() require.NoError(t, err) - ir, err := NewFileReader(filepath.Join(dir, indexFilename)) + ir, err := NewFileReader(filepath.Join(dir, indexFilename), nil) require.NoError(t, err) for p := range mi.postings { @@ -541,7 +553,7 @@ func TestDecbufUvarintWithInvalidBuffer(t *testing.T) { func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - _, err := NewReader(b) + _, err := NewReader(b, nil) require.Error(t, err) } @@ -553,7 +565,7 @@ func TestNewFileReaderErrorNoOpenFiles(t *testing.T) { err := ioutil.WriteFile(idxName, []byte("corrupted contents"), 0666) require.NoError(t, err) - _, err = NewFileReader(idxName) + _, err = NewFileReader(idxName, nil) require.Error(t, err) // dir.Close will fail on Win if idxName fd is not closed on error path. @@ -606,3 +618,59 @@ func TestSymbols(t *testing.T) { } require.NoError(t, iter.Err()) } + +func BenchmarkReader_ShardedPostings(b *testing.B) { + const ( + numSeries = 10000 + numShards = 16 + ) + + dir, err := ioutil.TempDir("", "benchmark_reader_sharded_postings") + require.NoError(b, err) + defer func() { + require.NoError(b, os.RemoveAll(dir)) + }() + + // Generate an index. + fn := filepath.Join(dir, indexFilename) + + iw, err := NewWriter(context.Background(), fn) + require.NoError(b, err) + + for i := 1; i <= numSeries; i++ { + require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i))) + } + require.NoError(b, iw.AddSymbol("const")) + require.NoError(b, iw.AddSymbol("unique")) + + for i := 1; i <= numSeries; i++ { + require.NoError(b, iw.AddSeries(uint64(i), labels.Labels{ + {Name: "const", Value: fmt.Sprintf("%10d", 1)}, + {Name: "unique", Value: fmt.Sprintf("%10d", i)}, + })) + } + + require.NoError(b, iw.Close()) + + for _, cacheEnabled := range []bool{true, false} { + b.Run(fmt.Sprintf("cached enabled: %v", cacheEnabled), func(b *testing.B) { + var cache ReaderCacheProvider + if cacheEnabled { + cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test") + } + + // Create a reader to read back all postings from the index. + ir, err := NewFileReader(fn, cache) + require.NoError(b, err) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + allPostings, err := ir.Postings("const", fmt.Sprintf("%10d", 1)) + require.NoError(b, err) + + ir.ShardedPostings(allPostings, uint64(n%numShards), numShards) + } + }) + } +} diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index ed2d6fb4b1..1edbc582da 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -24,6 +24,8 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/hashcache" ) // Make entries ~50B in size, to emulate real-world high cardinality. @@ -76,7 +78,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { }() blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlock(nil, blockdir, nil) + block, err := OpenBlock(nil, blockdir, nil, nil) require.NoError(b, err) defer func() { require.NoError(b, block.Close()) @@ -162,16 +164,28 @@ func BenchmarkQuerierSelect(b *testing.B) { } require.NoError(b, app.Commit()) - bench := func(b *testing.B, br BlockReader, sorted bool) { + bench := func(b *testing.B, br BlockReader, sorted bool, sharding bool) { matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") for s := 1; s <= numSeries; s *= 10 { b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { - q, err := NewBlockQuerier(br, 0, int64(s-1)) + mint := int64(0) + maxt := int64(s - 1) + q, err := NewBlockQuerier(br, mint, maxt) require.NoError(b, err) b.ResetTimer() for i := 0; i < b.N; i++ { - ss := q.Select(sorted, nil, matcher) + var hints *storage.SelectHints + if sharding { + hints = &storage.SelectHints{ + Start: mint, + End: maxt, + ShardIndex: uint64(i % 16), + ShardCount: 16, + } + } + + ss := q.Select(sorted, hints, matcher) for ss.Next() { } require.NoError(b, ss.Err()) @@ -182,10 +196,20 @@ func BenchmarkQuerierSelect(b *testing.B) { } b.Run("Head", func(b *testing.B) { - bench(b, h, false) + b.Run("without sharding", func(b *testing.B) { + bench(b, h, false, false) + }) + b.Run("with sharding", func(b *testing.B) { + bench(b, h, false, true) + }) }) b.Run("SortedHead", func(b *testing.B) { - bench(b, h, true) + b.Run("without sharding", func(b *testing.B) { + bench(b, h, true, false) + }) + b.Run("with sharding", func(b *testing.B) { + bench(b, h, true, true) + }) }) tmpdir, err := ioutil.TempDir("", "test_benchquerierselect") @@ -194,14 +218,20 @@ func BenchmarkQuerierSelect(b *testing.B) { require.NoError(b, os.RemoveAll(tmpdir)) }() + seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024) blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlock(nil, blockdir, nil) + block, err := OpenBlock(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test")) require.NoError(b, err) defer func() { require.NoError(b, block.Close()) }() b.Run("Block", func(b *testing.B) { - bench(b, block, false) + b.Run("without sharding", func(b *testing.B) { + bench(b, block, false, false) + }) + b.Run("with sharding", func(b *testing.B) { + bench(b, block, false, true) + }) }) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 0588f66194..18e237493a 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1323,7 +1323,7 @@ func BenchmarkQueryIterator(b *testing.B) { } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer block.Close() @@ -1390,7 +1390,7 @@ func BenchmarkQuerySeek(b *testing.B) { } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer block.Close() @@ -1529,7 +1529,7 @@ func BenchmarkSetMatcher(b *testing.B) { } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil) require.NoError(b, err) blocks = append(blocks, block) defer block.Close() @@ -1984,7 +1984,7 @@ func BenchmarkQueries(b *testing.B) { qs := make([]storage.Querier, 0, 10) for x := 0; x <= 10; x++ { - block, err := OpenBlock(nil, createBlock(b, dir, series), nil) + block, err := OpenBlock(nil, createBlock(b, dir, series), nil, nil) require.NoError(b, err) q, err := NewBlockQuerier(block, 1, int64(nSamples)) require.NoError(b, err) diff --git a/tsdb/repair_test.go b/tsdb/repair_test.go index 7fb2720fd6..aa0dc109cb 100644 --- a/tsdb/repair_test.go +++ b/tsdb/repair_test.go @@ -81,7 +81,7 @@ func TestRepairBadIndexVersion(t *testing.T) { require.NoError(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0777)) // Read current index to check integrity. - r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) + r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), nil) require.NoError(t, err) p, err := r.Postings("b", "1") require.NoError(t, err) @@ -99,7 +99,7 @@ func TestRepairBadIndexVersion(t *testing.T) { require.NoError(t, err) db.Close() - r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) + r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), nil) require.NoError(t, err) defer r.Close() p, err = r.Postings("b", "1")