From 4cc25c0cb0b96042a7d36a0dd53dc6970ad607fd Mon Sep 17 00:00:00 2001
From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Date: Fri, 25 Feb 2022 21:23:40 +0530
Subject: [PATCH] Fix panic on query when m-map replay fails with snapshot
 enabled (#10348)

* Fix panic on query when m-map replay fails with snapshot enabled

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix review

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix flake

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
---
 tsdb/chunks/chunk_write_queue_test.go |  3 +-
 tsdb/head.go                          | 25 +++++++--
 tsdb/head_test.go                     | 79 +++++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go
index 0e971a336b..a55896a6d6 100644
--- a/tsdb/chunks/chunk_write_queue_test.go
+++ b/tsdb/chunks/chunk_write_queue_test.go
@@ -178,7 +178,8 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
 
 	// Wait until all jobs have been processed.
 	callbackWg.Wait()
-	require.True(t, q.queueIsEmpty())
+
+	require.Eventually(t, q.queueIsEmpty, 500*time.Millisecond, 50*time.Millisecond)
 }
 
 func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
diff --git a/tsdb/head.go b/tsdb/head.go
index e1ab11fd98..5663e3cf90 100644
--- a/tsdb/head.go
+++ b/tsdb/head.go
@@ -502,6 +502,8 @@ func (h *Head) Init(minValidTime int64) error {
 		snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
 		if err != nil {
 			snapIdx, snapOffset = -1, 0
+			refSeries = make(map[chunks.HeadSeriesRef]*memSeries)
+
 			h.metrics.snapshotReplayErrorTotal.Inc()
 			level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
 			// We clear the partially loaded data to replay fresh from the WAL.
@@ -519,9 +521,16 @@ func (h *Head) Init(minValidTime int64) error {
 		if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
 			h.metrics.mmapChunkCorruptionTotal.Inc()
 		}
+
+		// Discard snapshot data since we need to replay the WAL for the missed m-map chunks data.
+		snapIdx, snapOffset = -1, 0
+
 		// If this fails, data will be recovered from WAL.
 		// Hence we wont lose any data (given WAL is not corrupt).
-		mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries)
+		mmappedChunks, err = h.removeCorruptedMmappedChunks(err)
+		if err != nil {
+			return err
+		}
 	}
 
 	level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
@@ -630,7 +639,6 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
 				return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]",
 					seriesRef, slice[len(slice)-1].minTime, slice[len(slice)-1].maxTime, mint, maxt)
 			}
-
 			slice = append(slice, &mmappedChunk{
 				ref:        chunkRef,
 				minTime:    mint,
@@ -672,7 +680,12 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
 
 // removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
 // loaded mmapped chunks.
-func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.HeadSeriesRef]*memSeries) map[chunks.HeadSeriesRef][]*mmappedChunk {
+func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef][]*mmappedChunk, error) {
+	// We never want to preserve the in-memory series from snapshots if we are repairing m-map chunks.
+	if err := h.resetInMemoryState(); err != nil {
+		return nil, err
+	}
+
 	level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
 
 	if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
@@ -680,11 +693,11 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head
 		if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil {
 			level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed", "err", err)
 		}
-		return map[chunks.HeadSeriesRef][]*mmappedChunk{}
+		return map[chunks.HeadSeriesRef][]*mmappedChunk{}, nil
 	}
 
 	level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
-	mmappedChunks, err := h.loadMmappedChunks(refSeries)
+	mmappedChunks, err := h.loadMmappedChunks(make(map[chunks.HeadSeriesRef]*memSeries))
 	if err != nil {
 		level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
 		if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil {
@@ -693,7 +706,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head
 		mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{}
 	}
 
-	return mmappedChunks
+	return mmappedChunks, nil
 }
 
 func (h *Head) ApplyConfig(cfg *config.Config) error {
diff --git a/tsdb/head_test.go b/tsdb/head_test.go
index eb4e2624bf..5d87bafbc8 100644
--- a/tsdb/head_test.go
+++ b/tsdb/head_test.go
@@ -3166,3 +3166,82 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
 
 	require.NoError(t, h.Close())
 }
+
+// Tests https://github.com/prometheus/prometheus/issues/10277.
+func TestReplayAfterMmapReplayError(t *testing.T) {
+	dir := t.TempDir()
+	var h *Head
+	var err error
+
+	openHead := func() {
+		wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
+		require.NoError(t, err)
+
+		opts := DefaultHeadOptions()
+		opts.ChunkRange = DefaultBlockDuration
+		opts.ChunkDirRoot = dir
+		opts.EnableMemorySnapshotOnShutdown = true
+		opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
+
+		h, err = NewHead(nil, nil, wlog, opts, nil)
+		require.NoError(t, err)
+		require.NoError(t, h.Init(0))
+	}
+
+	openHead()
+
+	itvl := int64(15 * time.Second / time.Millisecond)
+	lastTs := int64(0)
+	lbls := labels.FromStrings("__name__", "testing", "foo", "bar")
+	var expSamples []tsdbutil.Sample
+	addSamples := func(numSamples int) {
+		app := h.Appender(context.Background())
+		var ref storage.SeriesRef
+		for i := 0; i < numSamples; i++ {
+			ref, err = app.Append(ref, lbls, lastTs, float64(lastTs))
+			expSamples = append(expSamples, sample{t: lastTs, v: float64(lastTs)})
+			require.NoError(t, err)
+			lastTs += itvl
+			if i%10 == 0 {
+				require.NoError(t, app.Commit())
+				app = h.Appender(context.Background())
+			}
+		}
+		require.NoError(t, app.Commit())
+	}
+
+	// Creating multiple m-map files.
+	for i := 0; i < 5; i++ {
+		addSamples(250)
+		require.NoError(t, h.Close())
+		if i != 4 {
+			// Don't open head for the last iteration.
+			openHead()
+		}
+	}
+
+	files, err := ioutil.ReadDir(filepath.Join(dir, "chunks_head"))
+	require.Equal(t, 5, len(files))
+
+	// Corrupt a m-map file.
+	mmapFilePath := filepath.Join(dir, "chunks_head", "000002")
+	f, err := os.OpenFile(mmapFilePath, os.O_WRONLY, 0o666)
+	require.NoError(t, err)
+	_, err = f.WriteAt([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 17)
+	require.NoError(t, err)
+	require.NoError(t, f.Close())
+
+	openHead()
+
+	// There should be less m-map files due to corruption.
+	files, err = ioutil.ReadDir(filepath.Join(dir, "chunks_head"))
+	require.Equal(t, 2, len(files))
+
+	// Querying should not panic.
+	q, err := NewBlockQuerier(h, 0, lastTs)
+	require.NoError(t, err)
+	res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing"))
+	require.Equal(t, map[string][]tsdbutil.Sample{lbls.String(): expSamples}, res)
+
+	require.NoError(t, h.Close())
+}