mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Pass last segment number to loadWAL instead
Signed-off-by: Patryk Prus <p@trykpr.us>
This commit is contained in:
parent
c3a80df194
commit
d55d468e8f
|
@ -762,7 +762,7 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
|
|
||||||
// A corrupted checkpoint is a hard error for now and requires user
|
// A corrupted checkpoint is a hard error for now and requires user
|
||||||
// intervention. There's likely little data that can be recovered anyway.
|
// intervention. There's likely little data that can be recovered anyway.
|
||||||
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil {
|
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil {
|
||||||
return fmt.Errorf("backfill checkpoint: %w", err)
|
return fmt.Errorf("backfill checkpoint: %w", err)
|
||||||
}
|
}
|
||||||
h.updateWALReplayStatusRead(startFrom)
|
h.updateWALReplayStatusRead(startFrom)
|
||||||
|
@ -795,7 +795,7 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
|
return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
|
||||||
}
|
}
|
||||||
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks)
|
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt)
|
||||||
if err := sr.Close(); err != nil {
|
if err := sr.Close(); err != nil {
|
||||||
h.logger.Warn("Error while closing the wal segments reader", "err", err)
|
h.logger.Warn("Error while closing the wal segments reader", "err", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ type histogramRecord struct {
|
||||||
fh *histogram.FloatHistogram
|
fh *histogram.FloatHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
|
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) {
|
||||||
// Track number of samples that referenced a series we don't know about
|
// Track number of samples that referenced a series we don't know about
|
||||||
// for error reporting.
|
// for error reporting.
|
||||||
var unknownRefs atomic.Uint64
|
var unknownRefs atomic.Uint64
|
||||||
|
@ -74,11 +74,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||||
decodeErr, seriesCreationErr error
|
decodeErr, seriesCreationErr error
|
||||||
)
|
)
|
||||||
|
|
||||||
_, last, err := wlog.Segments(h.wal.Dir())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get last segment to set WAL expiry for duplicate series: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// For CorruptionErr ensure to terminate all workers before exiting.
|
// For CorruptionErr ensure to terminate all workers before exiting.
|
||||||
_, ok := err.(*wlog.CorruptionErr)
|
_, ok := err.(*wlog.CorruptionErr)
|
||||||
|
@ -243,7 +238,7 @@ Outer:
|
||||||
if !created {
|
if !created {
|
||||||
multiRef[walSeries.Ref] = mSeries.ref
|
multiRef[walSeries.Ref] = mSeries.ref
|
||||||
// Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints.
|
// Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints.
|
||||||
h.setWALExpiry(walSeries.Ref, last)
|
h.setWALExpiry(walSeries.Ref, lastSegment)
|
||||||
}
|
}
|
||||||
|
|
||||||
idx := uint64(mSeries.ref) % uint64(concurrency)
|
idx := uint64(mSeries.ref) % uint64(concurrency)
|
||||||
|
|
Loading…
Reference in a new issue