mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Fix panic on query when m-map replay fails with snapshot enabled (#10348)
* Fix panic on query when m-map replay fails with snapshot enabled Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix flake Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
0b835e341b
commit
4cc25c0cb0
|
@ -178,7 +178,8 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
|
|||
|
||||
// Wait until all jobs have been processed.
|
||||
callbackWg.Wait()
|
||||
require.True(t, q.queueIsEmpty())
|
||||
|
||||
require.Eventually(t, q.queueIsEmpty, 500*time.Millisecond, 50*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
|
||||
|
|
25
tsdb/head.go
25
tsdb/head.go
|
@ -502,6 +502,8 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
|
||||
if err != nil {
|
||||
snapIdx, snapOffset = -1, 0
|
||||
refSeries = make(map[chunks.HeadSeriesRef]*memSeries)
|
||||
|
||||
h.metrics.snapshotReplayErrorTotal.Inc()
|
||||
level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
|
||||
// We clear the partially loaded data to replay fresh from the WAL.
|
||||
|
@ -519,9 +521,16 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
|
||||
h.metrics.mmapChunkCorruptionTotal.Inc()
|
||||
}
|
||||
|
||||
// Discard snapshot data since we need to replay the WAL for the missed m-map chunks data.
|
||||
snapIdx, snapOffset = -1, 0
|
||||
|
||||
// If this fails, data will be recovered from WAL.
|
||||
// Hence we wont lose any data (given WAL is not corrupt).
|
||||
mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries)
|
||||
mmappedChunks, err = h.removeCorruptedMmappedChunks(err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
|
||||
|
@ -630,7 +639,6 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
|
|||
return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]",
|
||||
seriesRef, slice[len(slice)-1].minTime, slice[len(slice)-1].maxTime, mint, maxt)
|
||||
}
|
||||
|
||||
slice = append(slice, &mmappedChunk{
|
||||
ref: chunkRef,
|
||||
minTime: mint,
|
||||
|
@ -672,7 +680,12 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
|
|||
|
||||
// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
|
||||
// loaded mmapped chunks.
|
||||
func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.HeadSeriesRef]*memSeries) map[chunks.HeadSeriesRef][]*mmappedChunk {
|
||||
func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef][]*mmappedChunk, error) {
|
||||
// We never want to preserve the in-memory series from snapshots if we are repairing m-map chunks.
|
||||
if err := h.resetInMemoryState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
|
||||
|
||||
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
|
||||
|
@ -680,11 +693,11 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head
|
|||
if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil {
|
||||
level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed", "err", err)
|
||||
}
|
||||
return map[chunks.HeadSeriesRef][]*mmappedChunk{}
|
||||
return map[chunks.HeadSeriesRef][]*mmappedChunk{}, nil
|
||||
}
|
||||
|
||||
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
|
||||
mmappedChunks, err := h.loadMmappedChunks(refSeries)
|
||||
mmappedChunks, err := h.loadMmappedChunks(make(map[chunks.HeadSeriesRef]*memSeries))
|
||||
if err != nil {
|
||||
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
|
||||
if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil {
|
||||
|
@ -693,7 +706,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head
|
|||
mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{}
|
||||
}
|
||||
|
||||
return mmappedChunks
|
||||
return mmappedChunks, nil
|
||||
}
|
||||
|
||||
func (h *Head) ApplyConfig(cfg *config.Config) error {
|
||||
|
|
|
@ -3166,3 +3166,82 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
|
|||
|
||||
require.NoError(t, h.Close())
|
||||
}
|
||||
|
||||
// Tests https://github.com/prometheus/prometheus/issues/10277.
|
||||
func TestReplayAfterMmapReplayError(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
var h *Head
|
||||
var err error
|
||||
|
||||
openHead := func() {
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = DefaultBlockDuration
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.EnableMemorySnapshotOnShutdown = true
|
||||
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
|
||||
|
||||
h, err = NewHead(nil, nil, wlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
}
|
||||
|
||||
openHead()
|
||||
|
||||
itvl := int64(15 * time.Second / time.Millisecond)
|
||||
lastTs := int64(0)
|
||||
lbls := labels.FromStrings("__name__", "testing", "foo", "bar")
|
||||
var expSamples []tsdbutil.Sample
|
||||
addSamples := func(numSamples int) {
|
||||
app := h.Appender(context.Background())
|
||||
var ref storage.SeriesRef
|
||||
for i := 0; i < numSamples; i++ {
|
||||
ref, err = app.Append(ref, lbls, lastTs, float64(lastTs))
|
||||
expSamples = append(expSamples, sample{t: lastTs, v: float64(lastTs)})
|
||||
require.NoError(t, err)
|
||||
lastTs += itvl
|
||||
if i%10 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = h.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Creating multiple m-map files.
|
||||
for i := 0; i < 5; i++ {
|
||||
addSamples(250)
|
||||
require.NoError(t, h.Close())
|
||||
if i != 4 {
|
||||
// Don't open head for the last iteration.
|
||||
openHead()
|
||||
}
|
||||
}
|
||||
|
||||
files, err := ioutil.ReadDir(filepath.Join(dir, "chunks_head"))
|
||||
require.Equal(t, 5, len(files))
|
||||
|
||||
// Corrupt a m-map file.
|
||||
mmapFilePath := filepath.Join(dir, "chunks_head", "000002")
|
||||
f, err := os.OpenFile(mmapFilePath, os.O_WRONLY, 0o666)
|
||||
require.NoError(t, err)
|
||||
_, err = f.WriteAt([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 17)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
openHead()
|
||||
|
||||
// There should be less m-map files due to corruption.
|
||||
files, err = ioutil.ReadDir(filepath.Join(dir, "chunks_head"))
|
||||
require.Equal(t, 2, len(files))
|
||||
|
||||
// Querying should not panic.
|
||||
q, err := NewBlockQuerier(h, 0, lastTs)
|
||||
require.NoError(t, err)
|
||||
res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "testing"))
|
||||
require.Equal(t, map[string][]tsdbutil.Sample{lbls.String(): expSamples}, res)
|
||||
|
||||
require.NoError(t, h.Close())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue