diff --git a/tsdb/head.go b/tsdb/head.go index 43b8e10ab1..95f4e75212 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -762,7 +762,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -795,7 +795,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) if err := sr.Close(); err != nil { h.logger.Warn("Error while closing the wal segments reader", "err", err) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 39c47af46b..9385777aaf 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -50,7 +50,7 @@ type histogramRecord struct { fh *histogram.FloatHistogram } -func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 @@ -74,11 +74,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decodeErr, seriesCreationErr error ) - _, last, err := wlog.Segments(h.wal.Dir()) - if err != nil { - return fmt.Errorf("failed to get last segment to set WAL expiry for duplicate series: %w", err) - } - defer func() { // For CorruptionErr ensure to terminate all workers before exiting. _, ok := err.(*wlog.CorruptionErr) @@ -243,7 +238,7 @@ Outer: if !created { multiRef[walSeries.Ref] = mSeries.ref // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. - h.setWALExpiry(walSeries.Ref, last) + h.setWALExpiry(walSeries.Ref, lastSegment) } idx := uint64(mSeries.ref) % uint64(concurrency)