diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go index 0e971a336b..a55896a6d6 100644 --- a/tsdb/chunks/chunk_write_queue_test.go +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -178,7 +178,8 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { // Wait until all jobs have been processed. callbackWg.Wait() - require.True(t, q.queueIsEmpty()) + + require.Eventually(t, q.queueIsEmpty, 500*time.Millisecond, 50*time.Millisecond) } func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { diff --git a/tsdb/head.go b/tsdb/head.go index e1ab11fd98..5663e3cf90 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -502,6 +502,8 @@ func (h *Head) Init(minValidTime int64) error { snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot() if err != nil { snapIdx, snapOffset = -1, 0 + refSeries = make(map[chunks.HeadSeriesRef]*memSeries) + h.metrics.snapshotReplayErrorTotal.Inc() level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err) // We clear the partially loaded data to replay fresh from the WAL. @@ -519,9 +521,16 @@ func (h *Head) Init(minValidTime int64) error { if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { h.metrics.mmapChunkCorruptionTotal.Inc() } + + // Discard snapshot data since we need to replay the WAL for the missed m-map chunks data. + snapIdx, snapOffset = -1, 0 + // If this fails, data will be recovered from WAL. // Hence we wont lose any data (given WAL is not corrupt). - mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries) + mmappedChunks, err = h.removeCorruptedMmappedChunks(err) + if err != nil { + return err + } } level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String()) @@ -630,7 +639,6 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]", seriesRef, slice[len(slice)-1].minTime, slice[len(slice)-1].maxTime, mint, maxt) } - slice = append(slice, &mmappedChunk{ ref: chunkRef, minTime: mint, @@ -672,7 +680,12 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) // removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously // loaded mmapped chunks. -func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.HeadSeriesRef]*memSeries) map[chunks.HeadSeriesRef][]*mmappedChunk { +func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef][]*mmappedChunk, error) { + // We never want to preserve the in-memory series from snapshots if we are repairing m-map chunks. + if err := h.resetInMemoryState(); err != nil { + return nil, err + } + level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { @@ -680,11 +693,11 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil { level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed", "err", err) } - return map[chunks.HeadSeriesRef][]*mmappedChunk{} + return map[chunks.HeadSeriesRef][]*mmappedChunk{}, nil } level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks") - mmappedChunks, err := h.loadMmappedChunks(refSeries) + mmappedChunks, err := h.loadMmappedChunks(make(map[chunks.HeadSeriesRef]*memSeries)) if err != nil { level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil { @@ -693,7 +706,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{} } - return mmappedChunks + return mmappedChunks, nil } func (h *Head) ApplyConfig(cfg *config.Config) error { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index eb4e2624bf..5d87bafbc8 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3166,3 +3166,82 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { require.NoError(t, h.Close()) } + +// Tests https://github.com/prometheus/prometheus/issues/10277. +func TestReplayAfterMmapReplayError(t *testing.T) { + dir := t.TempDir() + var h *Head + var err error + + openHead := func() { + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = DefaultBlockDuration + opts.ChunkDirRoot = dir + opts.EnableMemorySnapshotOnShutdown = true + opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + + h, err = NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + } + + openHead() + + itvl := int64(15 * time.Second / time.Millisecond) + lastTs := int64(0) + lbls := labels.FromStrings("__name__", "testing", "foo", "bar") + var expSamples []tsdbutil.Sample + addSamples := func(numSamples int) { + app := h.Appender(context.Background()) + var ref storage.SeriesRef + for i := 0; i < numSamples; i++ { + ref, err = app.Append(ref, lbls, lastTs, float64(lastTs)) + expSamples = append(expSamples, sample{t: lastTs, v: float64(lastTs)}) + require.NoError(t, err) + lastTs += itvl + if i%10 == 0 { + require.NoError(t, app.Commit()) + app = h.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + } + + // Creating multiple m-map files. + for i := 0; i < 5; i++ { + addSamples(250) + require.NoError(t, h.Close()) + if i != 4 { + // Don't open head for the last iteration. + openHead() + } + } + + files, err := ioutil.ReadDir(filepath.Join(dir, "chunks_head")) + require.Equal(t, 5, len(files)) + + // Corrupt a m-map file. + mmapFilePath := filepath.Join(dir, "chunks_head", "000002") + f, err := os.OpenFile(mmapFilePath, os.O_WRONLY, 0o666) + require.NoError(t, err) + _, err = f.WriteAt([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 17) + require.NoError(t, err) + require.NoError(t, f.Close()) + + openHead() + + // There should be less m-map files due to corruption. + files, err = ioutil.ReadDir(filepath.Join(dir, "chunks_head")) + require.Equal(t, 2, len(files)) + + // Querying should not panic. + q, err := NewBlockQuerier(h, 0, lastTs) + require.NoError(t, err) + res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing")) + require.Equal(t, map[string][]tsdbutil.Sample{lbls.String(): expSamples}, res) + + require.NoError(t, h.Close()) +}