head.deleted -> head.walExpiries

Signed-off-by: Patryk Prus <p@trykpr.us>
This commit is contained in:
Patryk Prus 2025-02-28 11:30:08 -05:00
parent 72d1fc4322
commit 786d3d99ba
No known key found for this signature in database
GPG key ID: 795650115CA6A58F
3 changed files with 29 additions and 29 deletions

View file

@ -107,8 +107,8 @@ type Head struct {
// All series addressable by their ID or hash.
series *stripeSeries
deletedMtx sync.Mutex
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.
walExpiriesMtx sync.Mutex
walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until.
// TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings.
postings *index.MemPostings // Postings lists for terms.
@ -340,7 +340,7 @@ func (h *Head) resetInMemoryState() error {
h.exemplars = es
h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones()
h.deleted = map[chunks.HeadSeriesRef]int{}
h.walExpiries = map[chunks.HeadSeriesRef]int{}
h.chunkRange.Store(h.opts.ChunkRange)
h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64)
@ -1262,19 +1262,19 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64)
return false, false, 0
}
func (h *Head) checkDeleted(id chunks.HeadSeriesRef) (int, bool) {
h.deletedMtx.Lock()
defer h.deletedMtx.Unlock()
func (h *Head) checkWALExpiry(id chunks.HeadSeriesRef) (int, bool) {
h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock()
keepUntil, ok := h.deleted[id]
keepUntil, ok := h.walExpiries[id]
return keepUntil, ok
}
func (h *Head) markDeleted(id chunks.HeadSeriesRef, keepUntil int) {
h.deletedMtx.Lock()
defer h.deletedMtx.Unlock()
func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) {
h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock()
h.deleted[id] = keepUntil
h.walExpiries[id] = keepUntil
}
// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint
@ -1285,8 +1285,8 @@ func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool
return true
}
// Keep the record if the series was recently deleted.
keepUntil, ok := h.checkDeleted(id)
// Keep the record if the series has an expiry set.
keepUntil, ok := h.checkWALExpiry(id)
return ok && keepUntil > last
}
@ -1339,15 +1339,15 @@ func (h *Head) truncateWAL(mint int64) error {
h.logger.Error("truncating segments failed", "err", err)
}
// The checkpoint is written and segments before it is truncated, so we no
// longer need to track deleted series that are before it.
h.deletedMtx.Lock()
for ref, segment := range h.deleted {
// The checkpoint is written and segments before it is truncated, so stop
// tracking expired series.
h.walExpiriesMtx.Lock()
for ref, segment := range h.walExpiries {
if segment <= last {
delete(h.deleted, ref)
delete(h.walExpiries, ref)
}
}
h.deletedMtx.Unlock()
h.walExpiriesMtx.Unlock()
h.metrics.checkpointDeleteTotal.Inc()
if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
@ -1614,7 +1614,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
if h.wal != nil {
_, last, _ := wlog.Segments(h.wal.Dir())
h.deletedMtx.Lock()
h.walExpiriesMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
@ -1622,9 +1622,9 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.deleted[chunks.HeadSeriesRef(ref)] = last
h.walExpiries[chunks.HeadSeriesRef(ref)] = last
}
h.deletedMtx.Unlock()
h.walExpiriesMtx.Unlock()
}
return actualInOrderMint, minOOOTime, minMmapFile

View file

@ -728,14 +728,14 @@ func TestHead_ReadWAL(t *testing.T) {
// Duplicate series record should not be written to the head.
require.Nil(t, s101)
// But it should be marked as deleted.
keepUntil, ok := head.checkDeleted(101)
// But it should have a WAL expiry set.
keepUntil, ok := head.checkWALExpiry(101)
require.True(t, ok)
_, last, err := wlog.Segments(w.Dir())
require.NoError(t, err)
require.Equal(t, last, keepUntil)
// Only the duplicate series record should be marked as deleted.
_, ok = head.checkDeleted(50)
// Only the duplicate series record should have a WAL expiry set.
_, ok = head.checkWALExpiry(50)
require.False(t, ok)
expandChunk := func(c chunkenc.Iterator) (x []sample) {
@ -852,7 +852,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
{
name: "keep deleted series with keepUntil > last",
prepare: func(_ *testing.T, h *Head) {
h.markDeleted(chunks.HeadSeriesRef(existingRef), deletedKeepUntil)
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil)
},
seriesRef: chunks.HeadSeriesRef(existingRef),
last: deletedKeepUntil - 1,
@ -861,7 +861,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
{
name: "drop deleted series with keepUntil <= last",
prepare: func(_ *testing.T, h *Head) {
h.markDeleted(chunks.HeadSeriesRef(existingRef), deletedKeepUntil)
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil)
},
seriesRef: chunks.HeadSeriesRef(existingRef),
last: deletedKeepUntil,

View file

@ -243,7 +243,7 @@ Outer:
if !created {
multiRef[walSeries.Ref] = mSeries.ref
// Mark the walSeries as deleted, so it is kept in subsequent WAL checkpoints.
h.markDeleted(walSeries.Ref, last)
h.setWALExpiry(walSeries.Ref, last)
}
idx := uint64(mSeries.ref) % uint64(concurrency)