diff --git a/cmd/compact/main.go b/cmd/compact/main.go index 9c26db4af3..7cc4b75934 100644 --- a/cmd/compact/main.go +++ b/cmd/compact/main.go @@ -11,7 +11,6 @@ import ( golog "github.com/go-kit/log" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" ) @@ -72,7 +71,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true, func(l labels.Labels) uint64 { return l.Hash() }) + c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true) if err != nil { log.Fatalln("creating compator", err) } diff --git a/model/labels/sharding.go b/model/labels/sharding.go new file mode 100644 index 0000000000..9b51c3ab88 --- /dev/null +++ b/model/labels/sharding.go @@ -0,0 +1,32 @@ +package labels + +import ( + "github.com/cespare/xxhash/v2" +) + +// StableHash is a labels hashing implementation which is guaranteed to not change over time. +// This function should be used whenever labels hashing backward compatibility must be guaranteed. +func StableHash(ls Labels) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + for i, v := range ls { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range ls[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + } + return h.Sum64() + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + return xxhash.Sum64(b) +} diff --git a/tsdb/block.go b/tsdb/block.go index 3f9a82589a..01560033cd 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -322,11 +322,11 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { - return OpenBlockWithOptions(logger, dir, pool, nil, nil) + return OpenBlockWithOptions(logger, dir, pool, nil) } // OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function. -func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (pb *Block, err error) { +func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -347,7 +347,7 @@ func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cac } closers = append(closers, cr) - indexReader, err := index.NewFileReaderWithOptions(filepath.Join(dir, indexFilename), cache, shardFunc) + indexReader, err := index.NewFileReaderWithOptions(filepath.Join(dir, indexFilename), cache) if err != nil { return nil, err } diff --git a/tsdb/compact.go b/tsdb/compact.go index 4105515252..70402197ef 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -91,7 +91,6 @@ type LeveledCompactor struct { maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc enableOverlappingCompaction bool - shardFunc func(labels.Labels) uint64 concurrencyOpts LeveledCompactorConcurrencyOptions } @@ -158,10 +157,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { // NewLeveledCompactor returns a LeveledCompactor. func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction, nil) + return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction) } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool, shardFunc func(labels.Labels) uint64) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -184,7 +183,6 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register mergeFunc: mergeFunc, concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(), enableOverlappingCompaction: enableOverlappingCompaction, - shardFunc: shardFunc, }, nil } @@ -1107,7 +1105,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, obIx := uint64(0) if len(outBlocks) > 1 { - obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks)) + obIx = labels.StableHash(s.Labels()) % uint64(len(outBlocks)) } err := blockWriters[obIx].addSeries(s.Labels(), chks) @@ -1175,7 +1173,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo var obIx uint64 if len(outBlocks) > 1 { - obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks)) + obIx = labels.StableHash(s.Labels()) % uint64(len(outBlocks)) } for _, l := range s.Labels() { diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index eab7bf4225..e97d5275f8 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -439,7 +439,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 240, 720, 2160, - }, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) + }, nil, chunks.DefaultChunkSegmentSize, nil, true) require.NoError(t, err) tmpdir := t.TempDir() @@ -503,10 +503,6 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa return ret } -func shardFunc(l labels.Labels) uint64 { - return l.Hash() -} - func TestCompaction_CompactWithSplitting(t *testing.T) { seriesCounts := []int{10, 1234} shardCounts := []uint64{1, 13} @@ -537,7 +533,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { for _, shardCount := range shardCounts { t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) { - c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) + c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true) require.NoError(t, err) blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount) @@ -671,7 +667,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) { blockDirs = append(blockDirs, bdir) } - c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) + c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true) require.NoError(t, err) blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5) @@ -1146,7 +1142,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc) + c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true) require.NoError(t, err) meta := &BlockMeta{ diff --git a/tsdb/db.go b/tsdb/db.go index 32dce66ec7..c39b0786da 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -201,9 +201,6 @@ type Options struct { HeadPostingsForMatchersCacheSize int // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. HeadPostingsForMatchersCacheForce bool - - // Compute hash of labels to divide series into shards. - ShardFunc func(l labels.Labels) uint64 } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -568,7 +565,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, nil) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil) if err != nil { return nil, err } @@ -766,7 +763,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction, opts.ShardFunc) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor") @@ -820,7 +817,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled } - headOpts.ShardFunc = opts.ShardFunc db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head) if err != nil { return nil, err @@ -1350,7 +1346,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.ShardFunc) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache) if err != nil { return err } @@ -1433,7 +1429,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, shardFunc func(l labels.Labels) uint64) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") @@ -1455,7 +1451,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) } - block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, shardFunc) + block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider) if err != nil { corrupted[meta.ULID] = err continue diff --git a/tsdb/head.go b/tsdb/head.go index 81e39f71a4..493552d9b9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -176,8 +176,6 @@ type HeadOptions struct { PostingsForMatchersCacheTTL time.Duration PostingsForMatchersCacheSize int PostingsForMatchersCacheForce bool - - ShardFunc func(l labels.Labels) uint64 // Compute hash of labels to divide series into shards. } const ( @@ -1568,12 +1566,8 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e } func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { - shardHash := hash - if h.opts.ShardFunc != nil { - shardHash = h.opts.ShardFunc(lset) - } s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, shardHash, h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) + return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -1862,9 +1856,11 @@ type memSeries struct { ref chunks.HeadSeriesRef lset labels.Labels - hash uint64 meta *metadata.Metadata + // Series labels hash to use for sharding purposes. + shardHash uint64 + // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. // When compaction runs, chunks get moved into a block and all pointers are shifted like so: // @@ -1914,13 +1910,13 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, hash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { s := &memSeries{ lset: lset, ref: id, nextAt: math.MinInt64, chunkEndTimeVariance: chunkEndTimeVariance, - hash: hash, + shardHash: shardHash, } if !isolationDisabled { s.txs = newTxRing(4) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index dfd76264be..6cfcad575e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1383,7 +1383,7 @@ func (s *memSeries) appendPreprocessor( maxNextAt := s.nextAt s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt) - s.nextAt = addJitterToChunkEndTime(s.hash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance) + s.nextAt = addJitterToChunkEndTime(s.shardHash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance) } // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, // most likely because samples rate has changed and now they are arriving more frequently. diff --git a/tsdb/head_read.go b/tsdb/head_read.go index e8e852b8f5..fe42f28206 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -164,7 +164,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou } // Check if the series belong to the shard. - if s.hash%shardCount != shardIndex { + if s.shardHash%shardCount != shardIndex { continue } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 0361a3efab..55be98099b 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -242,7 +242,7 @@ func BenchmarkLoadWAL(b *testing.B) { for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. lbls := labels.Labels{} - s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, lbls.Hash(), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT) s.mmapCurrentHeadChunk(chunkDiskMapper) } @@ -758,7 +758,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { } lbls := labels.FromStrings("a", "b") - s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) @@ -1290,7 +1290,7 @@ func TestMemSeries_append(t *testing.T) { const chunkRange = 500 lbls := labels.Labels{} - s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1345,7 +1345,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { chunkRange := int64(1000) lbls := labels.Labels{} - s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) histograms := GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1402,7 +1402,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { chunkRange := DefaultBlockDuration lbls := labels.Labels{} - s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2437,7 +2437,6 @@ func TestHeadShardedPostings(t *testing.T) { defer func() { require.NoError(t, head.Close()) }() - head.opts.ShardFunc = func(l labels.Labels) uint64 { return l.Hash() } // Append some series. app := head.Appender(context.Background()) @@ -2630,7 +2629,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { const chunkRange = 500 lbls := labels.Labels{} - s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index c3b81bbb27..057d037c41 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1090,8 +1090,6 @@ type Reader struct { // Provides a cache mapping series labels hash by series ID. cacheProvider ReaderCacheProvider - - shardFunc func(l labels.Labels) uint64 // Computes a hash of Labels; must be consistent over time. } type postingOffset struct { @@ -1132,11 +1130,11 @@ func NewReaderWithCache(b ByteSlice, cacheProvider ReaderCacheProvider) (*Reader // NewFileReader returns a new index reader against the given index file. func NewFileReader(path string) (*Reader, error) { - return NewFileReaderWithOptions(path, nil, nil) + return NewFileReaderWithOptions(path, nil) } // NewFileReaderWithOptions is like NewFileReader but allows to pass a cache provider and sharding function. -func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (*Reader, error) { +func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err @@ -1148,7 +1146,6 @@ func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider, sh f.Close(), ).Err() } - r.shardFunc = shardFunc return r, nil } @@ -1774,7 +1771,7 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post return ErrPostings(errors.Errorf("series %d not found", id)) } - hash = r.shardFunc(bufLbls.Labels()) + hash = labels.StableHash(bufLbls.Labels()) if seriesHashCache != nil { seriesHashCache.Store(id, hash) } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 0ecb6447c4..2732a0d42b 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -249,7 +249,7 @@ func TestIndexRW_Postings(t *testing.T) { cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test") } - ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() }) + ir, err := NewFileReaderWithOptions(fn, cache) require.NoError(t, err) // List all postings for a given label value. This is what we expect to get @@ -646,7 +646,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) { } // Create a reader to read back all postings from the index. - ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() }) + ir, err := NewFileReaderWithOptions(fn, cache) require.NoError(b, err) b.ResetTimer() diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 78adce1f41..0aa3fc200f 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -271,7 +271,7 @@ func BenchmarkQuerierSelect(b *testing.B) { seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024) blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), func(l labels.Labels) uint64 { return l.Hash() }) + block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test")) require.NoError(b, err) defer func() { require.NoError(b, block.Close())