mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-14 17:44:06 -08:00
Update minOOOTime after truncating Head (#309)
* Update minOOOTime after truncating Head Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add a unit test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
f537910769
commit
9fe7d3a478
106
tsdb/head.go
106
tsdb/head.go
|
@ -1048,30 +1048,7 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||
}
|
||||
|
||||
h.metrics.headTruncateTotal.Inc()
|
||||
start := time.Now()
|
||||
|
||||
actualMint, minMmapFile := h.gc()
|
||||
level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start))
|
||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||
if actualMint > h.minTime.Load() {
|
||||
// The actual mint of the Head is higher than the one asked to truncate.
|
||||
appendableMinValidTime := h.appendableMinValidTime()
|
||||
if actualMint < appendableMinValidTime {
|
||||
h.minTime.Store(actualMint)
|
||||
h.minValidTime.Store(actualMint)
|
||||
} else {
|
||||
// The actual min time is in the appendable window.
|
||||
// So we set the mint to the appendableMinValidTime.
|
||||
h.minTime.Store(appendableMinValidTime)
|
||||
h.minValidTime.Store(appendableMinValidTime)
|
||||
}
|
||||
}
|
||||
|
||||
// Truncate the chunk m-mapper.
|
||||
if err := h.chunkDiskMapper.Truncate(minMmapFile); err != nil {
|
||||
return errors.Wrap(err, "truncate chunks.HeadReadWriter by file number")
|
||||
}
|
||||
return nil
|
||||
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
||||
}
|
||||
|
||||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||||
|
@ -1234,33 +1211,50 @@ func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapper
|
|||
curMinOOOMmapRef := chunks.ChunkDiskMapperRef(h.minOOOMmapRef.Load())
|
||||
if minOOOMmapRef.GreaterThan(curMinOOOMmapRef) {
|
||||
h.minOOOMmapRef.Store(uint64(minOOOMmapRef))
|
||||
start := time.Now()
|
||||
actualMint, minMmapFile := h.gc()
|
||||
level.Info(h.logger).Log("msg", "Head GC completed in truncateOOO", "duration", time.Since(start))
|
||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||
if actualMint > h.minTime.Load() {
|
||||
// The actual mint of the Head is higher than the one asked to truncate.
|
||||
appendableMinValidTime := h.appendableMinValidTime()
|
||||
if actualMint < appendableMinValidTime {
|
||||
h.minTime.Store(actualMint)
|
||||
h.minValidTime.Store(actualMint)
|
||||
} else {
|
||||
// The actual min time is in the appendable window.
|
||||
// So we set the mint to the appendableMinValidTime.
|
||||
h.minTime.Store(appendableMinValidTime)
|
||||
h.minValidTime.Store(appendableMinValidTime)
|
||||
}
|
||||
}
|
||||
|
||||
// Truncate the chunk m-mapper.
|
||||
if err := h.chunkDiskMapper.Truncate(minMmapFile); err != nil {
|
||||
return errors.Wrap(err, "truncate chunks.HeadReadWriter by file number in truncateOOO")
|
||||
if err := h.truncateSeriesAndChunkDiskMapper("truncateOOO"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return h.wbl.Truncate(lastWBLFile)
|
||||
}
|
||||
|
||||
// truncateSeriesAndChunkDiskMapper is a helper function for truncateMemory and truncateOOO.
|
||||
// It runs GC on the Head and truncates the ChunkDiskMapper accordingly.
|
||||
func (h *Head) truncateSeriesAndChunkDiskMapper(caller string) error {
|
||||
start := time.Now()
|
||||
headMaxt := h.MaxTime()
|
||||
actualMint, minOOOTime, minMmapFile := h.gc()
|
||||
level.Info(h.logger).Log("msg", "Head GC completed", "caller", caller, "duration", time.Since(start))
|
||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||
if actualMint > h.minTime.Load() {
|
||||
// The actual mint of the Head is higher than the one asked to truncate.
|
||||
appendableMinValidTime := h.appendableMinValidTime()
|
||||
if actualMint < appendableMinValidTime {
|
||||
h.minTime.Store(actualMint)
|
||||
h.minValidTime.Store(actualMint)
|
||||
} else {
|
||||
// The actual min time is in the appendable window.
|
||||
// So we set the mint to the appendableMinValidTime.
|
||||
h.minTime.Store(appendableMinValidTime)
|
||||
h.minValidTime.Store(appendableMinValidTime)
|
||||
}
|
||||
}
|
||||
if headMaxt-h.opts.OutOfOrderTimeWindow.Load() < minOOOTime {
|
||||
// The allowed OOO window is lower than the min OOO time seen during GC.
|
||||
// So it is possible that some OOO sample was inserted that was less that minOOOTime.
|
||||
// So we play safe and set it to the min that was possible.
|
||||
minOOOTime = headMaxt - h.opts.OutOfOrderTimeWindow.Load()
|
||||
}
|
||||
h.minOOOTime.Store(minOOOTime)
|
||||
|
||||
// Truncate the chunk m-mapper.
|
||||
if err := h.chunkDiskMapper.Truncate(minMmapFile); err != nil {
|
||||
return errors.Wrap(err, "truncate chunks.HeadReadWriter by file number")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
NumSeries uint64
|
||||
MinTime, MaxTime int64
|
||||
|
@ -1396,8 +1390,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
|
|||
// gc removes data before the minimum timestamp from the head.
|
||||
// It returns
|
||||
// * The actual min times of the chunks present in the Head.
|
||||
// * The min OOO time seen during the GC.
|
||||
// * Min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||
func (h *Head) gc() (int64, int) {
|
||||
func (h *Head) gc() (int64, int64, int) {
|
||||
// Only data strictly lower than this timestamp must be deleted.
|
||||
mint := h.MinTime()
|
||||
// Only ooo m-map chunks strictly lower than or equal to this ref
|
||||
|
@ -1406,7 +1401,7 @@ func (h *Head) gc() (int64, int) {
|
|||
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, chunksRemoved, actualMint, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
deleted, chunksRemoved, actualMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
|
@ -1436,7 +1431,7 @@ func (h *Head) gc() (int64, int) {
|
|||
h.deletedMtx.Unlock()
|
||||
}
|
||||
|
||||
return actualMint, minMmapFile
|
||||
return actualMint, minOOOTime, minMmapFile
|
||||
}
|
||||
|
||||
// Tombstones returns a new reader over the head's tombstones
|
||||
|
@ -1643,12 +1638,13 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||||
// and there's no easy way to cast maps.
|
||||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _ int64, minMmapFile int) {
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
deletedForCallback = []labels.Labels{}
|
||||
rmChunks = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
)
|
||||
minMmapFile = math.MaxInt32
|
||||
// Run through all series and truncate old chunks. Mark those with no
|
||||
|
@ -1672,6 +1668,16 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
if seq < minMmapFile {
|
||||
minMmapFile = seq
|
||||
}
|
||||
for _, ch := range series.oooMmappedChunks {
|
||||
if ch.minTime < minOOOTime {
|
||||
minOOOTime = ch.minTime
|
||||
}
|
||||
}
|
||||
}
|
||||
if series.oooHeadChunk != nil {
|
||||
if series.oooHeadChunk.minTime < minOOOTime {
|
||||
minOOOTime = series.oooHeadChunk.minTime
|
||||
}
|
||||
}
|
||||
if len(series.mmappedChunks) > 0 || len(series.oooMmappedChunks) > 0 ||
|
||||
series.headChunk != nil || series.oooHeadChunk != nil || series.pendingCommit {
|
||||
|
@ -1717,7 +1723,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
actualMint = mint
|
||||
}
|
||||
|
||||
return deleted, rmChunks, actualMint, minMmapFile
|
||||
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
|
||||
}
|
||||
|
||||
func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries {
|
||||
|
|
|
@ -3696,3 +3696,50 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
appendSample(s5, 240)
|
||||
verifyInOrderSamples(s5, 1)
|
||||
}
|
||||
|
||||
func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
|
||||
require.NoError(t, err)
|
||||
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
|
||||
|
||||
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, h.Close())
|
||||
})
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
appendSample := func(ts int64) {
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
app := h.Appender(context.Background())
|
||||
_, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
appendSample(300) // In-order sample.
|
||||
|
||||
require.Equal(t, int64(math.MaxInt64), h.MinOOOTime())
|
||||
|
||||
appendSample(295) // OOO sample.
|
||||
require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime())
|
||||
|
||||
// Allowed window for OOO is >=290, which is before the earliest ooo sample 295, so it gets set to the lower value.
|
||||
require.NoError(t, h.truncateOOO(1, 1))
|
||||
require.Equal(t, 290*time.Minute.Milliseconds(), h.MinOOOTime())
|
||||
|
||||
appendSample(310) // In-order sample.
|
||||
appendSample(305) // OOO sample.
|
||||
require.Equal(t, 290*time.Minute.Milliseconds(), h.MinOOOTime())
|
||||
|
||||
// Now the OOO sample 295 was not gc'ed yet. And allowed window for OOO is now >=300.
|
||||
// So the lowest among them, 295, is set as minOOOTime.
|
||||
require.NoError(t, h.truncateOOO(2, 2))
|
||||
require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue