mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Hardcode the labels stable hash function instead of taking it as an option
Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
300116a20c
commit
950c177c72
|
@ -11,7 +11,6 @@ import (
|
||||||
|
|
||||||
golog "github.com/go-kit/log"
|
golog "github.com/go-kit/log"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -72,7 +71,7 @@ func main() {
|
||||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true, func(l labels.Labels) uint64 { return l.Hash() })
|
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("creating compator", err)
|
log.Fatalln("creating compator", err)
|
||||||
}
|
}
|
||||||
|
|
32
model/labels/sharding.go
Normal file
32
model/labels/sharding.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package labels
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StableHash is a labels hashing implementation which is guaranteed to not change over time.
|
||||||
|
// This function should be used whenever labels hashing backward compatibility must be guaranteed.
|
||||||
|
func StableHash(ls Labels) uint64 {
|
||||||
|
// Use xxhash.Sum64(b) for fast path as it's faster.
|
||||||
|
b := make([]byte, 0, 1024)
|
||||||
|
for i, v := range ls {
|
||||||
|
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
|
||||||
|
// If labels entry is 1KB+ do not allocate whole entry.
|
||||||
|
h := xxhash.New()
|
||||||
|
_, _ = h.Write(b)
|
||||||
|
for _, v := range ls[i:] {
|
||||||
|
_, _ = h.WriteString(v.Name)
|
||||||
|
_, _ = h.Write(seps)
|
||||||
|
_, _ = h.WriteString(v.Value)
|
||||||
|
_, _ = h.Write(seps)
|
||||||
|
}
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
|
b = append(b, v.Name...)
|
||||||
|
b = append(b, seps[0])
|
||||||
|
b = append(b, v.Value...)
|
||||||
|
b = append(b, seps[0])
|
||||||
|
}
|
||||||
|
return xxhash.Sum64(b)
|
||||||
|
}
|
|
@ -322,11 +322,11 @@ type Block struct {
|
||||||
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
||||||
// to instantiate chunk structs.
|
// to instantiate chunk structs.
|
||||||
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
|
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
|
||||||
return OpenBlockWithOptions(logger, dir, pool, nil, nil)
|
return OpenBlockWithOptions(logger, dir, pool, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function.
|
// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function.
|
||||||
func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (pb *Block, err error) {
|
func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -347,7 +347,7 @@ func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cac
|
||||||
}
|
}
|
||||||
closers = append(closers, cr)
|
closers = append(closers, cr)
|
||||||
|
|
||||||
indexReader, err := index.NewFileReaderWithOptions(filepath.Join(dir, indexFilename), cache, shardFunc)
|
indexReader, err := index.NewFileReaderWithOptions(filepath.Join(dir, indexFilename), cache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,6 @@ type LeveledCompactor struct {
|
||||||
maxBlockChunkSegmentSize int64
|
maxBlockChunkSegmentSize int64
|
||||||
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
||||||
enableOverlappingCompaction bool
|
enableOverlappingCompaction bool
|
||||||
shardFunc func(labels.Labels) uint64
|
|
||||||
|
|
||||||
concurrencyOpts LeveledCompactorConcurrencyOptions
|
concurrencyOpts LeveledCompactorConcurrencyOptions
|
||||||
}
|
}
|
||||||
|
@ -158,10 +157,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
|
|
||||||
// NewLeveledCompactor returns a LeveledCompactor.
|
// NewLeveledCompactor returns a LeveledCompactor.
|
||||||
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
||||||
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction, nil)
|
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool, shardFunc func(labels.Labels) uint64) (*LeveledCompactor, error) {
|
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
||||||
if len(ranges) == 0 {
|
if len(ranges) == 0 {
|
||||||
return nil, errors.Errorf("at least one range must be provided")
|
return nil, errors.Errorf("at least one range must be provided")
|
||||||
}
|
}
|
||||||
|
@ -184,7 +183,6 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
|
||||||
mergeFunc: mergeFunc,
|
mergeFunc: mergeFunc,
|
||||||
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
|
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
|
||||||
enableOverlappingCompaction: enableOverlappingCompaction,
|
enableOverlappingCompaction: enableOverlappingCompaction,
|
||||||
shardFunc: shardFunc,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1107,7 +1105,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64,
|
||||||
|
|
||||||
obIx := uint64(0)
|
obIx := uint64(0)
|
||||||
if len(outBlocks) > 1 {
|
if len(outBlocks) > 1 {
|
||||||
obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks))
|
obIx = labels.StableHash(s.Labels()) % uint64(len(outBlocks))
|
||||||
}
|
}
|
||||||
|
|
||||||
err := blockWriters[obIx].addSeries(s.Labels(), chks)
|
err := blockWriters[obIx].addSeries(s.Labels(), chks)
|
||||||
|
@ -1175,7 +1173,7 @@ func (c *LeveledCompactor) populateSymbols(sets []storage.ChunkSeriesSet, outBlo
|
||||||
|
|
||||||
var obIx uint64
|
var obIx uint64
|
||||||
if len(outBlocks) > 1 {
|
if len(outBlocks) > 1 {
|
||||||
obIx = c.shardFunc(s.Labels()) % uint64(len(outBlocks))
|
obIx = labels.StableHash(s.Labels()) % uint64(len(outBlocks))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, l := range s.Labels() {
|
for _, l := range s.Labels() {
|
||||||
|
|
|
@ -439,7 +439,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
||||||
240,
|
240,
|
||||||
720,
|
720,
|
||||||
2160,
|
2160,
|
||||||
}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
}, nil, chunks.DefaultChunkSegmentSize, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
tmpdir := t.TempDir()
|
tmpdir := t.TempDir()
|
||||||
|
@ -503,10 +503,6 @@ func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sa
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func shardFunc(l labels.Labels) uint64 {
|
|
||||||
return l.Hash()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCompaction_CompactWithSplitting(t *testing.T) {
|
func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
seriesCounts := []int{10, 1234}
|
seriesCounts := []int{10, 1234}
|
||||||
shardCounts := []uint64{1, 13}
|
shardCounts := []uint64{1, 13}
|
||||||
|
@ -537,7 +533,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
|
|
||||||
for _, shardCount := range shardCounts {
|
for _, shardCount := range shardCounts {
|
||||||
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
||||||
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
||||||
|
@ -671,7 +667,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) {
|
||||||
blockDirs = append(blockDirs, bdir)
|
blockDirs = append(blockDirs, bdir)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
||||||
|
@ -1146,7 +1142,7 @@ func TestCompaction_populateBlock(t *testing.T) {
|
||||||
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
|
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true, shardFunc)
|
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
meta := &BlockMeta{
|
meta := &BlockMeta{
|
||||||
|
|
14
tsdb/db.go
14
tsdb/db.go
|
@ -201,9 +201,6 @@ type Options struct {
|
||||||
HeadPostingsForMatchersCacheSize int
|
HeadPostingsForMatchersCacheSize int
|
||||||
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
|
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
|
||||||
HeadPostingsForMatchersCacheForce bool
|
HeadPostingsForMatchersCacheForce bool
|
||||||
|
|
||||||
// Compute hash of labels to divide series into shards.
|
|
||||||
ShardFunc func(l labels.Labels) uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
|
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
|
||||||
|
@ -568,7 +565,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
|
||||||
return nil, ErrClosed
|
return nil, ErrClosed
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, nil)
|
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -766,7 +763,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction, opts.ShardFunc)
|
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return nil, errors.Wrap(err, "create leveled compactor")
|
return nil, errors.Wrap(err, "create leveled compactor")
|
||||||
|
@ -820,7 +817,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
|
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
|
||||||
headOpts.IsolationDisabled = opts.IsolationDisabled
|
headOpts.IsolationDisabled = opts.IsolationDisabled
|
||||||
}
|
}
|
||||||
headOpts.ShardFunc = opts.ShardFunc
|
|
||||||
db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
|
db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1350,7 +1346,7 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
db.mtx.Lock()
|
db.mtx.Lock()
|
||||||
defer db.mtx.Unlock()
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.ShardFunc)
|
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1433,7 +1429,7 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, shardFunc func(l labels.Labels) uint64) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
|
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
|
||||||
bDirs, err := blockDirs(dir)
|
bDirs, err := blockDirs(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "find blocks")
|
return nil, nil, errors.Wrap(err, "find blocks")
|
||||||
|
@ -1455,7 +1451,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
|
||||||
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
|
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, shardFunc)
|
block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
corrupted[meta.ULID] = err
|
corrupted[meta.ULID] = err
|
||||||
continue
|
continue
|
||||||
|
|
16
tsdb/head.go
16
tsdb/head.go
|
@ -176,8 +176,6 @@ type HeadOptions struct {
|
||||||
PostingsForMatchersCacheTTL time.Duration
|
PostingsForMatchersCacheTTL time.Duration
|
||||||
PostingsForMatchersCacheSize int
|
PostingsForMatchersCacheSize int
|
||||||
PostingsForMatchersCacheForce bool
|
PostingsForMatchersCacheForce bool
|
||||||
|
|
||||||
ShardFunc func(l labels.Labels) uint64 // Compute hash of labels to divide series into shards.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -1568,12 +1566,8 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||||
shardHash := hash
|
|
||||||
if h.opts.ShardFunc != nil {
|
|
||||||
shardHash = h.opts.ShardFunc(lset)
|
|
||||||
}
|
|
||||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||||
return newMemSeries(lset, id, shardHash, h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled)
|
return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
|
@ -1862,9 +1856,11 @@ type memSeries struct {
|
||||||
|
|
||||||
ref chunks.HeadSeriesRef
|
ref chunks.HeadSeriesRef
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
hash uint64
|
|
||||||
meta *metadata.Metadata
|
meta *metadata.Metadata
|
||||||
|
|
||||||
|
// Series labels hash to use for sharding purposes.
|
||||||
|
shardHash uint64
|
||||||
|
|
||||||
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
|
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
|
||||||
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
|
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
|
||||||
//
|
//
|
||||||
|
@ -1914,13 +1910,13 @@ type memSeriesOOOFields struct {
|
||||||
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
|
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, hash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries {
|
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries {
|
||||||
s := &memSeries{
|
s := &memSeries{
|
||||||
lset: lset,
|
lset: lset,
|
||||||
ref: id,
|
ref: id,
|
||||||
nextAt: math.MinInt64,
|
nextAt: math.MinInt64,
|
||||||
chunkEndTimeVariance: chunkEndTimeVariance,
|
chunkEndTimeVariance: chunkEndTimeVariance,
|
||||||
hash: hash,
|
shardHash: shardHash,
|
||||||
}
|
}
|
||||||
if !isolationDisabled {
|
if !isolationDisabled {
|
||||||
s.txs = newTxRing(4)
|
s.txs = newTxRing(4)
|
||||||
|
|
|
@ -1383,7 +1383,7 @@ func (s *memSeries) appendPreprocessor(
|
||||||
maxNextAt := s.nextAt
|
maxNextAt := s.nextAt
|
||||||
|
|
||||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt)
|
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt)
|
||||||
s.nextAt = addJitterToChunkEndTime(s.hash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance)
|
s.nextAt = addJitterToChunkEndTime(s.shardHash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance)
|
||||||
}
|
}
|
||||||
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
|
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
|
||||||
// most likely because samples rate has changed and now they are arriving more frequently.
|
// most likely because samples rate has changed and now they are arriving more frequently.
|
||||||
|
|
|
@ -164,7 +164,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the series belong to the shard.
|
// Check if the series belong to the shard.
|
||||||
if s.hash%shardCount != shardIndex {
|
if s.shardHash%shardCount != shardIndex {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -242,7 +242,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
||||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||||
// Create one mmapped chunk per series, with one sample at the given time.
|
// Create one mmapped chunk per series, with one sample at the given time.
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, lbls.Hash(), 0, defaultIsolationDisabled)
|
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled)
|
||||||
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT)
|
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT)
|
||||||
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
||||||
}
|
}
|
||||||
|
@ -758,7 +758,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
lbls := labels.FromStrings("a", "b")
|
lbls := labels.FromStrings("a", "b")
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled)
|
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
|
||||||
|
|
||||||
for i := 0; i < 4000; i += 5 {
|
for i := 0; i < 4000; i += 5 {
|
||||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
|
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
|
||||||
|
@ -1290,7 +1290,7 @@ func TestMemSeries_append(t *testing.T) {
|
||||||
const chunkRange = 500
|
const chunkRange = 500
|
||||||
|
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled)
|
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
|
||||||
|
|
||||||
// Add first two samples at the very end of a chunk range and the next two
|
// Add first two samples at the very end of a chunk range and the next two
|
||||||
// on and after it.
|
// on and after it.
|
||||||
|
@ -1345,7 +1345,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
||||||
chunkRange := int64(1000)
|
chunkRange := int64(1000)
|
||||||
|
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled)
|
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
|
||||||
|
|
||||||
histograms := GenerateTestHistograms(4)
|
histograms := GenerateTestHistograms(4)
|
||||||
histogramWithOneMoreBucket := histograms[3].Copy()
|
histogramWithOneMoreBucket := histograms[3].Copy()
|
||||||
|
@ -1402,7 +1402,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||||
chunkRange := DefaultBlockDuration
|
chunkRange := DefaultBlockDuration
|
||||||
|
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled)
|
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
|
||||||
|
|
||||||
// At this slow rate, we will fill the chunk in two block durations.
|
// At this slow rate, we will fill the chunk in two block durations.
|
||||||
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
|
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
|
||||||
|
@ -2437,7 +2437,6 @@ func TestHeadShardedPostings(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
}()
|
}()
|
||||||
head.opts.ShardFunc = func(l labels.Labels) uint64 { return l.Hash() }
|
|
||||||
|
|
||||||
// Append some series.
|
// Append some series.
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
@ -2630,7 +2629,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
||||||
const chunkRange = 500
|
const chunkRange = 500
|
||||||
|
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 0, defaultIsolationDisabled)
|
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
|
||||||
|
|
||||||
for i := 0; i < 7; i++ {
|
for i := 0; i < 7; i++ {
|
||||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
|
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
|
||||||
|
|
|
@ -1090,8 +1090,6 @@ type Reader struct {
|
||||||
|
|
||||||
// Provides a cache mapping series labels hash by series ID.
|
// Provides a cache mapping series labels hash by series ID.
|
||||||
cacheProvider ReaderCacheProvider
|
cacheProvider ReaderCacheProvider
|
||||||
|
|
||||||
shardFunc func(l labels.Labels) uint64 // Computes a hash of Labels; must be consistent over time.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type postingOffset struct {
|
type postingOffset struct {
|
||||||
|
@ -1132,11 +1130,11 @@ func NewReaderWithCache(b ByteSlice, cacheProvider ReaderCacheProvider) (*Reader
|
||||||
|
|
||||||
// NewFileReader returns a new index reader against the given index file.
|
// NewFileReader returns a new index reader against the given index file.
|
||||||
func NewFileReader(path string) (*Reader, error) {
|
func NewFileReader(path string) (*Reader, error) {
|
||||||
return NewFileReaderWithOptions(path, nil, nil)
|
return NewFileReaderWithOptions(path, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileReaderWithOptions is like NewFileReader but allows to pass a cache provider and sharding function.
|
// NewFileReaderWithOptions is like NewFileReader but allows to pass a cache provider and sharding function.
|
||||||
func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider, shardFunc func(l labels.Labels) uint64) (*Reader, error) {
|
func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider) (*Reader, error) {
|
||||||
f, err := fileutil.OpenMmapFile(path)
|
f, err := fileutil.OpenMmapFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1148,7 +1146,6 @@ func NewFileReaderWithOptions(path string, cacheProvider ReaderCacheProvider, sh
|
||||||
f.Close(),
|
f.Close(),
|
||||||
).Err()
|
).Err()
|
||||||
}
|
}
|
||||||
r.shardFunc = shardFunc
|
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
@ -1774,7 +1771,7 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post
|
||||||
return ErrPostings(errors.Errorf("series %d not found", id))
|
return ErrPostings(errors.Errorf("series %d not found", id))
|
||||||
}
|
}
|
||||||
|
|
||||||
hash = r.shardFunc(bufLbls.Labels())
|
hash = labels.StableHash(bufLbls.Labels())
|
||||||
if seriesHashCache != nil {
|
if seriesHashCache != nil {
|
||||||
seriesHashCache.Store(id, hash)
|
seriesHashCache.Store(id, hash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -249,7 +249,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||||
cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test")
|
cache = hashcache.NewSeriesHashCache(1024 * 1024 * 1024).GetBlockCacheProvider("test")
|
||||||
}
|
}
|
||||||
|
|
||||||
ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() })
|
ir, err := NewFileReaderWithOptions(fn, cache)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// List all postings for a given label value. This is what we expect to get
|
// List all postings for a given label value. This is what we expect to get
|
||||||
|
@ -646,7 +646,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a reader to read back all postings from the index.
|
// Create a reader to read back all postings from the index.
|
||||||
ir, err := NewFileReaderWithOptions(fn, cache, func(l labels.Labels) uint64 { return l.Hash() })
|
ir, err := NewFileReaderWithOptions(fn, cache)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
|
@ -271,7 +271,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
||||||
|
|
||||||
seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024)
|
seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024)
|
||||||
blockdir := createBlockFromHead(b, tmpdir, h)
|
blockdir := createBlockFromHead(b, tmpdir, h)
|
||||||
block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), func(l labels.Labels) uint64 { return l.Hash() })
|
block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"))
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(b, block.Close())
|
require.NoError(b, block.Close())
|
||||||
|
|
Loading…
Reference in a new issue