diff --git a/cmd/compact/main.go b/cmd/compact/main.go index 7cc4b75934..9c26db4af3 100644 --- a/cmd/compact/main.go +++ b/cmd/compact/main.go @@ -11,6 +11,7 @@ import ( golog "github.com/go-kit/log" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" ) @@ -71,7 +72,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true) + c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true, func(l labels.Labels) uint64 { return l.Hash() }) if err != nil { log.Fatalln("creating compator", err) } diff --git a/tsdb/block.go b/tsdb/block.go index cff320f4d9..3f9a82589a 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -87,7 +87,7 @@ type IndexReader interface { // ShardedPostings returns a postings list filtered by the provided shardIndex // out of shardCount. For a given posting, its shard MUST be computed hashing - // the series labels mod shardCount (eg. `labels.Hash() % shardCount == shardIndex`). + // the series labels mod shardCount, using a hash function which is consistent over time. ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings // Series populates the given builder and chunk metas for the series identified @@ -322,11 +322,11 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { - return OpenBlockWithCache(logger, dir, pool, nil) + return OpenBlockWithOptions(logger, dir, pool, nil, nil) } -// OpenBlockWithCache is like OpenBlock but allows to pass a cache provider. -func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) { +// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function. +func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -347,7 +347,7 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache } closers = append(closers, cr) - indexReader, err := index.NewFileReaderWithCache(filepath.Join(dir, indexFilename), cache) + indexReader, err := index.NewFileReaderWithOptions(filepath.Join(dir, indexFilename), cache, shardFunc) if err != nil { return nil, err } diff --git a/tsdb/compact.go b/tsdb/compact.go index fff2e75233..4105515252 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -91,6 +91,7 @@ type LeveledCompactor struct { maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc enableOverlappingCompaction bool + shardFunc func(labels.Labels) uint64 concurrencyOpts LeveledCompactorConcurrencyOptions } @@ -157,10 +158,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { // NewLeveledCompactor returns a LeveledCompactor. func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction) + return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction, nil) } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool, shardFunc func(labels.Labels) uint64) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -183,6 +184,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register mergeFunc: mergeFunc, concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(), enableOverlappingCompaction: enableOverlappingCompaction, + shardFunc: shardFunc, }, nil } @@ -1105,7 +1107,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, obIx := uint64(0) if len(outBlocks) > 1 { - obIx = s.Labels().Hash() % uint64(len(outBlocks)) + obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks)) } err := blockWriters[obIx].addSeries(s.Labels(), chks) @@ -1171,7 +1173,10 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo s := seriesSet.At() - obIx := s.Labels().Hash() % uint64(len(outBlocks)) + var obIx uint64 + if len(outBlocks) > 1 { + obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks)) + } for _, l := range s.Labels() { if err := batchers[obIx].addSymbol(l.Name); err != nil { diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 0b3f3f145a..60ab52d393 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -433,13 +433,13 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestCompactionFailWillCleanUpTempDir(t *testing.T) { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{ + compactor, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{ 20, 60, 240, 720, 2160, - }, nil, nil, true) + }, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) require.NoError(t, err) tmpdir := t.TempDir() @@ -503,6 +503,10 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa return ret } +func shardFunc(l labels.Labels) uint64 { + return l.Hash() +} + func TestCompaction_CompactWithSplitting(t *testing.T) { seriesCounts := []int{10, 1234} shardCounts := []uint64{1, 13} @@ -533,7 +537,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { for _, shardCount := range shardCounts { t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) { - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true) + c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) require.NoError(t, err) blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount) @@ -667,7 +671,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) { blockDirs = append(blockDirs, bdir) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true) + c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) require.NoError(t, err) blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5) @@ -1142,7 +1146,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, true) + c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) require.NoError(t, err) meta := &BlockMeta{ diff --git a/tsdb/db.go b/tsdb/db.go index 09a19e18e8..32dce66ec7 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -201,6 +201,9 @@ type Options struct { HeadPostingsForMatchersCacheSize int // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. HeadPostingsForMatchersCacheForce bool + + // Compute hash of labels to divide series into shards. + ShardFunc func(l labels.Labels) uint64 } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -565,7 +568,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, nil) if err != nil { return nil, err } @@ -763,7 +766,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction, opts.ShardFunc) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor") @@ -817,6 +820,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled } + headOpts.ShardFunc = opts.ShardFunc db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head) if err != nil { return nil, err @@ -1346,7 +1350,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.ShardFunc) if err != nil { return err } @@ -1429,7 +1433,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, shardFunc func(l labels.Labels) uint64) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") @@ -1451,7 +1455,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) } - block, err = OpenBlockWithCache(l, bDir, chunkPool, cacheProvider) + block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, shardFunc) if err != nil { corrupted[meta.ULID] = err continue diff --git a/tsdb/head.go b/tsdb/head.go index e61b565344..8ca8fce368 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -176,6 +176,8 @@ type HeadOptions struct { PostingsForMatchersCacheTTL time.Duration PostingsForMatchersCacheSize int PostingsForMatchersCacheForce bool + + ShardFunc func(l labels.Labels) uint64 // Compute hash of labels to divide series into shards. } const ( @@ -1566,8 +1568,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e } func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { + shardHash := hash + if h.opts.ShardFunc != nil { + shardHash = h.opts.ShardFunc(lset) + } s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, hash, h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) + return newMemSeries(lset, id, shardHash, h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7b5023ca35..823f8b570c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2438,6 +2438,7 @@ func TestHeadShardedPostings(t *testing.T) { defer func() { require.NoError(t, head.Close()) }() + head.opts.ShardFunc = func(l labels.Labels) uint64 { return l.Hash() } // Append some series. app := head.Appender(context.Background()) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index d4a921ad0d..c3b81bbb27 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1090,6 +1090,8 @@ type Reader struct { // Provides a cache mapping series labels hash by series ID. cacheProvider ReaderCacheProvider + + shardFunc func(l labels.Labels) uint64 // Computes a hash of Labels; must be consistent over time. } type postingOffset struct { @@ -1130,11 +1132,11 @@ func NewReaderWithCache(b ByteSlice, cacheProvider ReaderCacheProvider) (*Reader // NewFileReader returns a new index reader against the given index file. func NewFileReader(path string) (*Reader, error) { - return NewFileReaderWithCache(path, nil) + return NewFileReaderWithOptions(path, nil, nil) } -// NewFileReaderWithCache is like NewFileReader but allows to pass a cache provider. -func NewFileReaderWithCache(path string, cacheProvider ReaderCacheProvider) (*Reader, error) { +// NewFileReaderWithOptions is like NewFileReader but allows to pass a cache provider and sharding function. +func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err @@ -1146,6 +1148,7 @@ func NewFileReaderWithCache(path string, cacheProvider ReaderCacheProvider) (*Re f.Close(), ).Err() } + r.shardFunc = shardFunc return r, nil } @@ -1771,7 +1774,7 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post return ErrPostings(errors.Errorf("series %d not found", id)) } - hash = bufLbls.Labels().Hash() + hash = r.shardFunc(bufLbls.Labels()) if seriesHashCache != nil { seriesHashCache.Store(id, hash) } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index d29a74641a..0ecb6447c4 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -249,7 +249,7 @@ func TestIndexRW_Postings(t *testing.T) { cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test") } - ir, err := NewFileReaderWithCache(fn, cache) + ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() }) require.NoError(t, err) // List all postings for a given label value. This is what we expect to get @@ -646,7 +646,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) { } // Create a reader to read back all postings from the index. - ir, err := NewFileReaderWithCache(fn, cache) + ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() }) require.NoError(b, err) b.ResetTimer() diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 54dc8702e0..78adce1f41 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -271,7 +271,7 @@ func BenchmarkQuerierSelect(b *testing.B) { seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024) blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlockWithCache(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test")) + block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), func(l labels.Labels) uint64 { return l.Hash() }) require.NoError(b, err) defer func() { require.NoError(b, block.Close())