mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
TSDB: refactor cleanup of chunks and series
Extract the middle of the loop into a function, so it will be easier to modify the `seriesHashmap` data structure. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
ab2a7bb74f
commit
071d5732af
119
tsdb/head.go
119
tsdb/head.go
|
@ -1777,70 +1777,73 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
deletedFromPrevStripe = 0
|
||||
)
|
||||
minMmapFile = math.MaxInt32
|
||||
// Run through all series and truncate old chunks. Mark those with no
|
||||
// chunks left as deleted and store their ID.
|
||||
|
||||
// For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID.
|
||||
check := func(i int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
|
||||
series.Lock()
|
||||
defer series.Unlock()
|
||||
|
||||
rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef)
|
||||
|
||||
if len(series.mmappedChunks) > 0 {
|
||||
seq, _ := series.mmappedChunks[0].ref.Unpack()
|
||||
if seq < minMmapFile {
|
||||
minMmapFile = seq
|
||||
}
|
||||
}
|
||||
if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 {
|
||||
seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack()
|
||||
if seq < minMmapFile {
|
||||
minMmapFile = seq
|
||||
}
|
||||
for _, ch := range series.ooo.oooMmappedChunks {
|
||||
if ch.minTime < minOOOTime {
|
||||
minOOOTime = ch.minTime
|
||||
}
|
||||
}
|
||||
}
|
||||
if series.ooo != nil && series.ooo.oooHeadChunk != nil {
|
||||
if series.ooo.oooHeadChunk.minTime < minOOOTime {
|
||||
minOOOTime = series.ooo.oooHeadChunk.minTime
|
||||
}
|
||||
}
|
||||
if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit ||
|
||||
(series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) {
|
||||
seriesMint := series.minTime()
|
||||
if seriesMint < actualMint {
|
||||
actualMint = seriesMint
|
||||
}
|
||||
return
|
||||
}
|
||||
// The series is gone entirely. We need to keep the series lock
|
||||
// and make sure we have acquired the stripe locks for hash and ID of the
|
||||
// series alike.
|
||||
// If we don't hold them all, there's a very small chance that a series receives
|
||||
// samples again while we are half-way into deleting it.
|
||||
j := int(series.ref) & (s.size - 1)
|
||||
|
||||
if i != j {
|
||||
s.locks[j].Lock()
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
s.hashes[i].del(hash, series.lset)
|
||||
delete(s.series[j], series.ref)
|
||||
deletedForCallback[series.ref] = series.lset
|
||||
|
||||
if i != j {
|
||||
s.locks[j].Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Run through all series shard by shard, checking which should be deleted.
|
||||
for i := 0; i < s.size; i++ {
|
||||
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
|
||||
s.locks[i].Lock()
|
||||
|
||||
for hash, all := range s.hashes[i] {
|
||||
for _, series := range all {
|
||||
series.Lock()
|
||||
rmChunks += series.truncateChunksBefore(mint, minOOOMmapRef)
|
||||
|
||||
if len(series.mmappedChunks) > 0 {
|
||||
seq, _ := series.mmappedChunks[0].ref.Unpack()
|
||||
if seq < minMmapFile {
|
||||
minMmapFile = seq
|
||||
}
|
||||
}
|
||||
if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 {
|
||||
seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack()
|
||||
if seq < minMmapFile {
|
||||
minMmapFile = seq
|
||||
}
|
||||
for _, ch := range series.ooo.oooMmappedChunks {
|
||||
if ch.minTime < minOOOTime {
|
||||
minOOOTime = ch.minTime
|
||||
}
|
||||
}
|
||||
}
|
||||
if series.ooo != nil && series.ooo.oooHeadChunk != nil {
|
||||
if series.ooo.oooHeadChunk.minTime < minOOOTime {
|
||||
minOOOTime = series.ooo.oooHeadChunk.minTime
|
||||
}
|
||||
}
|
||||
if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit ||
|
||||
(series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) {
|
||||
seriesMint := series.minTime()
|
||||
if seriesMint < actualMint {
|
||||
actualMint = seriesMint
|
||||
}
|
||||
series.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// The series is gone entirely. We need to keep the series lock
|
||||
// and make sure we have acquired the stripe locks for hash and ID of the
|
||||
// series alike.
|
||||
// If we don't hold them all, there's a very small chance that a series receives
|
||||
// samples again while we are half-way into deleting it.
|
||||
j := int(series.ref) & (s.size - 1)
|
||||
|
||||
if i != j {
|
||||
s.locks[j].Lock()
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
s.hashes[i].del(hash, series.lset)
|
||||
delete(s.series[j], series.ref)
|
||||
deletedForCallback[series.ref] = series.lset
|
||||
|
||||
if i != j {
|
||||
s.locks[j].Unlock()
|
||||
}
|
||||
|
||||
series.Unlock()
|
||||
check(i, hash, series, deletedForCallback)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue