Store mmMaxTime in same field as seriesShard

We don't use seriesShard during DB initialization, so we can use the
same 8 bytes to store mmMaxTime, and save those during the rest of the
lifetime of the database.

This doesn't affect CPU performance.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
Oleg Zaytsev 2024-07-30 10:20:29 +02:00
parent b7f2f3c3ac
commit d8e1b6bdfd
No known key found for this signature in database
GPG key ID: 7E9FE9FD48F512EF
3 changed files with 44 additions and 17 deletions

View file

@ -178,6 +178,7 @@ type HeadOptions struct {
WALReplayConcurrency int
// EnableSharding enables ShardedPostings() support in the Head.
// EnableSharding is temporarily disabled during Init().
EnableSharding bool
}
@ -609,7 +610,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
func (h *Head) Init(minValidTime int64) (err error) {
h.minValidTime.Store(minValidTime)
defer func() {
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
@ -623,6 +624,24 @@ func (h *Head) Init(minValidTime int64) error {
}
}()
// If sharding is enabled, disable it while initializing, and calculate the shards later.
// We're going to use that field for other purposes during WAL replay,
// so we don't want to waste time on calculating the shard that we're going to lose anyway.
if h.opts.EnableSharding {
h.opts.EnableSharding = false
defer func() {
if err == nil {
h.opts.EnableSharding = true
// No locking is needed here as nobody should be writing while we're in Init.
for _, stripe := range h.series.series {
for _, s := range stripe {
s.shardHashOrMemoryMappedMaxTime = labels.StableHash(s.lset)
}
}
}
}()
}
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
start := time.Now()
@ -683,7 +702,6 @@ func (h *Head) Init(minValidTime int64) error {
mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
lastMmapRef chunks.ChunkDiskMapperRef
err error
mmapChunkReplayDuration time.Duration
)
@ -2068,9 +2086,11 @@ type memSeries struct {
ref chunks.HeadSeriesRef
meta *metadata.Metadata
// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
// been explicitly enabled in TSDB.
shardHash uint64
// Series labels hash to use for sharding purposes.
// The value is always 0 when sharding has not been explicitly enabled in TSDB.
// While the WAL replay the value stored here is the max time of any mmapped chunk,
// and the shard hash is re-calculated after WAL replay is complete.
shardHashOrMemoryMappedMaxTime uint64
// Everything after here should only be accessed with the lock held.
sync.Mutex
@ -2095,8 +2115,6 @@ type memSeries struct {
ooo *memSeriesOOOFields
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
nextAt int64 // Timestamp at which to cut the next chunk.
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
pendingCommit bool // Whether there are samples waiting to be committed to this series.
@ -2127,10 +2145,10 @@ type memSeriesOOOFields struct {
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
nextAt: math.MinInt64,
shardHash: shardHash,
lset: lset,
ref: id,
nextAt: math.MinInt64,
shardHashOrMemoryMappedMaxTime: shardHash,
}
if !isolationDisabled {
s.txs = newTxRing(0)
@ -2218,6 +2236,12 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD
return removedInOrder + removedOOO
}
// shardHash returns the shard hash of the series, only available after WAL replay.
func (s *memSeries) shardHash() uint64 { return s.shardHashOrMemoryMappedMaxTime }
// mmMaxTime returns the max time of any mmapped chunk in the series, only available during WAL replay.
func (s *memSeries) mmMaxTime() int64 { return int64(s.shardHashOrMemoryMappedMaxTime) }
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
// acquiring lock.
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {

View file

@ -170,7 +170,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
}
// Check if the series belong to the shard.
if s.shardHash%shardCount != shardIndex {
if s.shardHash()%shardCount != shardIndex {
continue
}

View file

@ -435,6 +435,8 @@ Outer:
return nil
}
func minInt64() int64 { return math.MinInt64 }
// resetSeriesWithMMappedChunks is only used during the WAL replay.
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
if mSeries.ref != walSeriesRef {
@ -481,10 +483,11 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
}
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len(mmc) == 0 {
mSeries.mmMaxTime = math.MinInt64
mSeries.shardHashOrMemoryMappedMaxTime = uint64(minInt64())
} else {
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
mmMaxTime := mmc[len(mmc)-1].maxTime
mSeries.shardHashOrMemoryMappedMaxTime = uint64(mmMaxTime)
h.updateMinMaxTime(mmc[0].minTime, mmMaxTime)
}
if len(oooMmc) != 0 {
// Mint and maxt can be in any chunk, they are not sorted.
@ -585,7 +588,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
unknownRefs++
continue
}
if s.T <= ms.mmMaxTime {
if s.T <= ms.mmMaxTime() {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
@ -614,7 +617,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
unknownHistogramRefs++
continue
}
if s.t <= ms.mmMaxTime {
if s.t <= ms.mmMaxTime() {
continue
}
var chunkCreated bool