mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Fix Possible Race Condition in TSDB (#7815)
* Replace tsdb chunk mapper size with atomic; protect mmappedChunkFiles with read path mutex on DeleteCorrupted Signed-off-by: Max Neverov <neverov.max@gmail.com> * PR fixes Signed-off-by: Max Neverov <neverov.max@gmail.com>
This commit is contained in:
parent
ca6f2bde94
commit
bb5c6b38e2
|
@ -106,7 +106,7 @@ type ChunkDiskMapper struct {
|
||||||
|
|
||||||
// The total size of bytes in the closed files.
|
// The total size of bytes in the closed files.
|
||||||
// Needed to calculate the total size of all segments on disk.
|
// Needed to calculate the total size of all segments on disk.
|
||||||
size int64
|
size atomic.Int64
|
||||||
|
|
||||||
// If 'true', it indicated that the maxt of all the on-disk files were set
|
// If 'true', it indicated that the maxt of all the on-disk files were set
|
||||||
// after iterating through all the chunks in those files.
|
// after iterating through all the chunks in those files.
|
||||||
|
@ -178,7 +178,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||||
chkFileIndices = append(chkFileIndices, seq)
|
chkFileIndices = append(chkFileIndices, seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
cdm.size = 0
|
cdm.size.Store(int64(0))
|
||||||
|
|
||||||
// Check for gaps in the files.
|
// Check for gaps in the files.
|
||||||
sort.Ints(chkFileIndices)
|
sort.Ints(chkFileIndices)
|
||||||
|
@ -207,7 +207,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||||
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
|
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
|
||||||
}
|
}
|
||||||
|
|
||||||
cdm.size += int64(b.byteSlice.Len())
|
cdm.size.Add(int64(b.byteSlice.Len()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -340,7 +340,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
cdm.size += cdm.curFileSize()
|
cdm.size.Add(cdm.curFileSize())
|
||||||
cdm.curFileNumBytes.Store(int64(n))
|
cdm.curFileNumBytes.Store(int64(n))
|
||||||
|
|
||||||
if cdm.curFile != nil {
|
if cdm.curFile != nil {
|
||||||
|
@ -354,6 +354,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cdm.readPathMtx.Lock()
|
||||||
cdm.curFileSequence = seq
|
cdm.curFileSequence = seq
|
||||||
cdm.curFile = newFile
|
cdm.curFile = newFile
|
||||||
if cdm.chkWriter != nil {
|
if cdm.chkWriter != nil {
|
||||||
|
@ -362,7 +363,6 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
|
||||||
cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize)
|
cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
cdm.readPathMtx.Lock()
|
|
||||||
cdm.closers[cdm.curFileSequence] = mmapFile
|
cdm.closers[cdm.curFileSequence] = mmapFile
|
||||||
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
|
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
|
||||||
cdm.readPathMtx.Unlock()
|
cdm.readPathMtx.Unlock()
|
||||||
|
@ -693,7 +693,7 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
|
||||||
cdm.readPathMtx.Unlock()
|
cdm.readPathMtx.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cdm.size -= int64(cdm.mmappedChunkFiles[seq].byteSlice.Len())
|
cdm.size.Sub(int64(cdm.mmappedChunkFiles[seq].byteSlice.Len()))
|
||||||
delete(cdm.mmappedChunkFiles, seq)
|
delete(cdm.mmappedChunkFiles, seq)
|
||||||
delete(cdm.closers, seq)
|
delete(cdm.closers, seq)
|
||||||
}
|
}
|
||||||
|
@ -720,17 +720,20 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
|
||||||
|
|
||||||
// Delete all the head chunk files following the corrupt head chunk file.
|
// Delete all the head chunk files following the corrupt head chunk file.
|
||||||
segs := []int{}
|
segs := []int{}
|
||||||
|
cdm.readPathMtx.RLock()
|
||||||
for seg := range cdm.mmappedChunkFiles {
|
for seg := range cdm.mmappedChunkFiles {
|
||||||
if seg >= cerr.FileIndex {
|
if seg >= cerr.FileIndex {
|
||||||
segs = append(segs, seg)
|
segs = append(segs, seg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cdm.readPathMtx.RUnlock()
|
||||||
|
|
||||||
return cdm.deleteFiles(segs)
|
return cdm.deleteFiles(segs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of the chunk files.
|
// Size returns the size of the chunk files.
|
||||||
func (cdm *ChunkDiskMapper) Size() int64 {
|
func (cdm *ChunkDiskMapper) Size() int64 {
|
||||||
return cdm.size + cdm.curFileSize()
|
return cdm.size.Load() + cdm.curFileSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cdm *ChunkDiskMapper) curFileSize() int64 {
|
func (cdm *ChunkDiskMapper) curFileSize() int64 {
|
||||||
|
|
Loading…
Reference in a new issue