mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Merge pull request #369 from grafana/shard-func
* tsdb: make sharding function a parameter Instead of relying on `labels.Hash()`, which may change, have the caller pass in a shard function if required. For most purposes `tsdb.Options.ShardFunc` is used, but the compactor may be created independently so `NewLeveledCompactorWithChunkSize` also takes a shard function parameter. Regular Prometheus, which does not use block sharding, will have this parameter as nil. Rename WithCache functions as WithOptions Where they now have 2 or more extra parameters. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
commit
2b4c652e10
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
golog "github.com/go-kit/log"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
)
|
||||
|
||||
|
@ -71,7 +72,7 @@ func main() {
|
|||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true)
|
||||
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true, func(l labels.Labels) uint64 { return l.Hash() })
|
||||
if err != nil {
|
||||
log.Fatalln("creating compator", err)
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ type IndexReader interface {
|
|||
|
||||
// ShardedPostings returns a postings list filtered by the provided shardIndex
|
||||
// out of shardCount. For a given posting, its shard MUST be computed hashing
|
||||
// the series labels mod shardCount (eg. `labels.Hash() % shardCount == shardIndex`).
|
||||
// the series labels mod shardCount, using a hash function which is consistent over time.
|
||||
ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
|
||||
|
||||
// Series populates the given builder and chunk metas for the series identified
|
||||
|
@ -322,11 +322,11 @@ type Block struct {
|
|||
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
||||
// to instantiate chunk structs.
|
||||
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
|
||||
return OpenBlockWithCache(logger, dir, pool, nil)
|
||||
return OpenBlockWithOptions(logger, dir, pool, nil, nil)
|
||||
}
|
||||
|
||||
// OpenBlockWithCache is like OpenBlock but allows to pass a cache provider.
|
||||
func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) {
|
||||
// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function.
|
||||
func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (pb *Block, err error) {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
|
@ -347,7 +347,7 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache
|
|||
}
|
||||
closers = append(closers, cr)
|
||||
|
||||
indexReader, err := index.NewFileReaderWithCache(filepath.Join(dir, indexFilename), cache)
|
||||
indexReader, err := index.NewFileReaderWithOptions(filepath.Join(dir, indexFilename), cache, shardFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -91,6 +91,7 @@ type LeveledCompactor struct {
|
|||
maxBlockChunkSegmentSize int64
|
||||
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
||||
enableOverlappingCompaction bool
|
||||
shardFunc func(labels.Labels) uint64
|
||||
|
||||
concurrencyOpts LeveledCompactorConcurrencyOptions
|
||||
}
|
||||
|
@ -157,10 +158,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
|||
|
||||
// NewLeveledCompactor returns a LeveledCompactor.
|
||||
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
||||
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction)
|
||||
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction, nil)
|
||||
}
|
||||
|
||||
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
||||
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool, shardFunc func(labels.Labels) uint64) (*LeveledCompactor, error) {
|
||||
if len(ranges) == 0 {
|
||||
return nil, errors.Errorf("at least one range must be provided")
|
||||
}
|
||||
|
@ -183,6 +184,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
|
|||
mergeFunc: mergeFunc,
|
||||
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
|
||||
enableOverlappingCompaction: enableOverlappingCompaction,
|
||||
shardFunc: shardFunc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1105,7 +1107,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
|
|||
|
||||
obIx := uint64(0)
|
||||
if len(outBlocks) > 1 {
|
||||
obIx = s.Labels().Hash() % uint64(len(outBlocks))
|
||||
obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks))
|
||||
}
|
||||
|
||||
err := blockWriters[obIx].addSeries(s.Labels(), chks)
|
||||
|
@ -1171,7 +1173,10 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
|
|||
|
||||
s := seriesSet.At()
|
||||
|
||||
obIx := s.Labels().Hash() % uint64(len(outBlocks))
|
||||
var obIx uint64
|
||||
if len(outBlocks) > 1 {
|
||||
obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks))
|
||||
}
|
||||
|
||||
for _, l := range s.Labels() {
|
||||
if err := batchers[obIx].addSymbol(l.Name); err != nil {
|
||||
|
|
|
@ -433,13 +433,13 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
||||
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{
|
||||
compactor, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{
|
||||
20,
|
||||
60,
|
||||
240,
|
||||
720,
|
||||
2160,
|
||||
}, nil, nil, true)
|
||||
}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpdir := t.TempDir()
|
||||
|
@ -503,6 +503,10 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa
|
|||
return ret
|
||||
}
|
||||
|
||||
func shardFunc(l labels.Labels) uint64 {
|
||||
return l.Hash()
|
||||
}
|
||||
|
||||
func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||
seriesCounts := []int{10, 1234}
|
||||
shardCounts := []uint64{1, 13}
|
||||
|
@ -533,7 +537,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
|
|||
|
||||
for _, shardCount := range shardCounts {
|
||||
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
|
||||
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
||||
|
@ -667,7 +671,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) {
|
|||
blockDirs = append(blockDirs, bdir)
|
||||
}
|
||||
|
||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
|
||||
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
||||
|
@ -1142,7 +1146,7 @@ func TestCompaction_populateBlock(t *testing.T) {
|
|||
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
|
||||
}
|
||||
|
||||
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, true)
|
||||
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
meta := &BlockMeta{
|
||||
|
|
14
tsdb/db.go
14
tsdb/db.go
|
@ -201,6 +201,9 @@ type Options struct {
|
|||
HeadPostingsForMatchersCacheSize int
|
||||
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
|
||||
HeadPostingsForMatchersCacheForce bool
|
||||
|
||||
// Compute hash of labels to divide series into shards.
|
||||
ShardFunc func(l labels.Labels) uint64
|
||||
}
|
||||
|
||||
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
|
||||
|
@ -565,7 +568,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
|
|||
return nil, ErrClosed
|
||||
default:
|
||||
}
|
||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil)
|
||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -763,7 +766,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction)
|
||||
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction, opts.ShardFunc)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, errors.Wrap(err, "create leveled compactor")
|
||||
|
@ -817,6 +820,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
|
||||
headOpts.IsolationDisabled = opts.IsolationDisabled
|
||||
}
|
||||
headOpts.ShardFunc = opts.ShardFunc
|
||||
db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1346,7 +1350,7 @@ func (db *DB) reloadBlocks() (err error) {
|
|||
db.mtx.Lock()
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache)
|
||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.ShardFunc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1429,7 +1433,7 @@ func (db *DB) reloadBlocks() (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
|
||||
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, shardFunc func(l labels.Labels) uint64) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
|
||||
bDirs, err := blockDirs(dir)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "find blocks")
|
||||
|
@ -1451,7 +1455,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
|
|||
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
|
||||
}
|
||||
|
||||
block, err = OpenBlockWithCache(l, bDir, chunkPool, cacheProvider)
|
||||
block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, shardFunc)
|
||||
if err != nil {
|
||||
corrupted[meta.ULID] = err
|
||||
continue
|
||||
|
|
|
@ -176,6 +176,8 @@ type HeadOptions struct {
|
|||
PostingsForMatchersCacheTTL time.Duration
|
||||
PostingsForMatchersCacheSize int
|
||||
PostingsForMatchersCacheForce bool
|
||||
|
||||
ShardFunc func(l labels.Labels) uint64 // Compute hash of labels to divide series into shards.
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -1566,8 +1568,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
|||
}
|
||||
|
||||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
shardHash := hash
|
||||
if h.opts.ShardFunc != nil {
|
||||
shardHash = h.opts.ShardFunc(lset)
|
||||
}
|
||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||
return newMemSeries(lset, id, hash, h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled)
|
||||
return newMemSeries(lset, id, shardHash, h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
|
|
|
@ -2438,6 +2438,7 @@ func TestHeadShardedPostings(t *testing.T) {
|
|||
defer func() {
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
head.opts.ShardFunc = func(l labels.Labels) uint64 { return l.Hash() }
|
||||
|
||||
// Append some series.
|
||||
app := head.Appender(context.Background())
|
||||
|
|
|
@ -1090,6 +1090,8 @@ type Reader struct {
|
|||
|
||||
// Provides a cache mapping series labels hash by series ID.
|
||||
cacheProvider ReaderCacheProvider
|
||||
|
||||
shardFunc func(l labels.Labels) uint64 // Computes a hash of Labels; must be consistent over time.
|
||||
}
|
||||
|
||||
type postingOffset struct {
|
||||
|
@ -1130,11 +1132,11 @@ func NewReaderWithCache(b ByteSlice, cacheProvider ReaderCacheProvider) (*Reader
|
|||
|
||||
// NewFileReader returns a new index reader against the given index file.
|
||||
func NewFileReader(path string) (*Reader, error) {
|
||||
return NewFileReaderWithCache(path, nil)
|
||||
return NewFileReaderWithOptions(path, nil, nil)
|
||||
}
|
||||
|
||||
// NewFileReaderWithCache is like NewFileReader but allows to pass a cache provider.
|
||||
func NewFileReaderWithCache(path string, cacheProvider ReaderCacheProvider) (*Reader, error) {
|
||||
// NewFileReaderWithOptions is like NewFileReader but allows to pass a cache provider and sharding function.
|
||||
func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (*Reader, error) {
|
||||
f, err := fileutil.OpenMmapFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1146,6 +1148,7 @@ func NewFileReaderWithCache(path string, cacheProvider ReaderCacheProvider) (*Re
|
|||
f.Close(),
|
||||
).Err()
|
||||
}
|
||||
r.shardFunc = shardFunc
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
@ -1771,7 +1774,7 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post
|
|||
return ErrPostings(errors.Errorf("series %d not found", id))
|
||||
}
|
||||
|
||||
hash = bufLbls.Labels().Hash()
|
||||
hash = r.shardFunc(bufLbls.Labels())
|
||||
if seriesHashCache != nil {
|
||||
seriesHashCache.Store(id, hash)
|
||||
}
|
||||
|
|
|
@ -249,7 +249,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test")
|
||||
}
|
||||
|
||||
ir, err := NewFileReaderWithCache(fn, cache)
|
||||
ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() })
|
||||
require.NoError(t, err)
|
||||
|
||||
// List all postings for a given label value. This is what we expect to get
|
||||
|
@ -646,7 +646,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
|
|||
}
|
||||
|
||||
// Create a reader to read back all postings from the index.
|
||||
ir, err := NewFileReaderWithCache(fn, cache)
|
||||
ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() })
|
||||
require.NoError(b, err)
|
||||
|
||||
b.ResetTimer()
|
||||
|
|
|
@ -271,7 +271,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
|
||||
seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024)
|
||||
blockdir := createBlockFromHead(b, tmpdir, h)
|
||||
block, err := OpenBlockWithCache(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"))
|
||||
block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), func(l labels.Labels) uint64 { return l.Hash() })
|
||||
require.NoError(b, err)
|
||||
defer func() {
|
||||
require.NoError(b, block.Close())
|
||||
|
|
Loading…
Reference in a new issue