mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Automatically remove incorrect snapshot with index that is ahead of WAL (#11859)
Signed-off-by: Rens Groothuijsen <l.groothuijsen@alumni.maastrichtuniversity.nl> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
5ec1b4baaf
commit
d33eb3ab17
49
tsdb/head.go
49
tsdb/head.go
|
@ -568,20 +568,47 @@ func (h *Head) Init(minValidTime int64) error {
|
||||||
|
|
||||||
if h.opts.EnableMemorySnapshotOnShutdown {
|
if h.opts.EnableMemorySnapshotOnShutdown {
|
||||||
level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
|
level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
|
||||||
var err error
|
// If there are any WAL files, there should be at least one WAL file with an index that is current or newer
|
||||||
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
|
// than the snapshot index. If the WAL index is behind the snapshot index somehow, the snapshot is assumed
|
||||||
if err != nil {
|
// to be outdated.
|
||||||
snapIdx, snapOffset = -1, 0
|
loadSnapshot := true
|
||||||
refSeries = make(map[chunks.HeadSeriesRef]*memSeries)
|
if h.wal != nil {
|
||||||
|
_, endAt, err := wlog.Segments(h.wal.Dir())
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "finding WAL segments")
|
||||||
|
}
|
||||||
|
|
||||||
h.metrics.snapshotReplayErrorTotal.Inc()
|
_, idx, _, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
|
||||||
level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
|
if err != nil && err != record.ErrNotFound {
|
||||||
// We clear the partially loaded data to replay fresh from the WAL.
|
level.Error(h.logger).Log("msg", "Could not find last snapshot", "err", err)
|
||||||
if err := h.resetInMemoryState(); err != nil {
|
}
|
||||||
return err
|
|
||||||
|
if err == nil && endAt < idx {
|
||||||
|
loadSnapshot = false
|
||||||
|
level.Warn(h.logger).Log("msg", "Last WAL file is behind snapshot, removing snapshots")
|
||||||
|
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, math.MaxInt, math.MaxInt); err != nil {
|
||||||
|
level.Error(h.logger).Log("msg", "Error while deleting snapshot directories", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if loadSnapshot {
|
||||||
|
var err error
|
||||||
|
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
|
||||||
|
if err == nil {
|
||||||
|
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
snapIdx, snapOffset = -1, 0
|
||||||
|
refSeries = make(map[chunks.HeadSeriesRef]*memSeries)
|
||||||
|
|
||||||
|
h.metrics.snapshotReplayErrorTotal.Inc()
|
||||||
|
level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
|
||||||
|
// We clear the partially loaded data to replay fresh from the WAL.
|
||||||
|
if err := h.resetInMemoryState(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mmapChunkReplayStart := time.Now()
|
mmapChunkReplayStart := time.Now()
|
||||||
|
|
|
@ -4786,3 +4786,58 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
|
|
||||||
checkHeaders()
|
checkHeaders()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSnapshotAheadOfWALError(t *testing.T) {
|
||||||
|
head, _ := newTestHead(t, 120*4, false, false)
|
||||||
|
head.opts.EnableMemorySnapshotOnShutdown = true
|
||||||
|
// Add a sample to fill WAL.
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 10, 10)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
// Increment snapshot index to create sufficiently large difference.
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
_, err = head.wal.NextSegment()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, head.Close()) // This will create a snapshot.
|
||||||
|
|
||||||
|
_, idx, _, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, idx)
|
||||||
|
|
||||||
|
// Restart the WAL while keeping the old snapshot. The new head is created manually in this case in order
|
||||||
|
// to keep using the same snapshot directory instead of a random one.
|
||||||
|
require.NoError(t, os.RemoveAll(head.wal.Dir()))
|
||||||
|
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||||
|
w, _ := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||||
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// Add a sample to fill WAL.
|
||||||
|
app = head.Appender(context.Background())
|
||||||
|
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 10, 10)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
lastSegment, _, _ := w.LastSegmentAndOffset()
|
||||||
|
require.Equal(t, 0, lastSegment)
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
|
||||||
|
// New WAL is saved, but old snapshot still exists.
|
||||||
|
_, idx, _, err = LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, idx)
|
||||||
|
|
||||||
|
// Create new Head which should detect the incorrect index and delete the snapshot.
|
||||||
|
head.opts.EnableMemorySnapshotOnShutdown = true
|
||||||
|
w, _ = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||||
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, head.Init(math.MinInt64))
|
||||||
|
|
||||||
|
// Verify that snapshot directory does not exist anymore.
|
||||||
|
_, _, _, err = LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||||
|
require.Equal(t, record.ErrNotFound, err)
|
||||||
|
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue