mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Avoid gaps in in-order data after restart with out-of-order enabled (#277)
* Avoid gaps in in-order data after restart with out-of-order enabled Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests, do the temporary patch only if OOO is enabled Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Avoid Peter's confusion Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Use latest OutOfOrderTimeWindow Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
1446b53d87
commit
5e8406a1d4
|
@ -169,6 +169,9 @@ type BlockMeta struct {
|
||||||
|
|
||||||
// Version of the index format.
|
// Version of the index format.
|
||||||
Version int `json:"version"`
|
Version int `json:"version"`
|
||||||
|
|
||||||
|
// OutOfOrder is true if the block was directly created from out-of-order samples.
|
||||||
|
OutOfOrder bool `json:"out_of_order"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockStats contains stats about contents of a block.
|
// BlockStats contains stats about contents of a block.
|
||||||
|
|
|
@ -608,9 +608,10 @@ func (c *LeveledCompactor) compactOOO(dest string, oooHead *OOOCompactionHead, s
|
||||||
for jx := range outBlocks[ix] {
|
for jx := range outBlocks[ix] {
|
||||||
uid := ulid.MustNew(outBlocksTime, rand.Reader)
|
uid := ulid.MustNew(outBlocksTime, rand.Reader)
|
||||||
meta := &BlockMeta{
|
meta := &BlockMeta{
|
||||||
ULID: uid,
|
ULID: uid,
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
|
OutOfOrder: true,
|
||||||
}
|
}
|
||||||
meta.Compaction.Level = 1
|
meta.Compaction.Level = 1
|
||||||
meta.Compaction.Sources = []ulid.ULID{uid}
|
meta.Compaction.Sources = []ulid.ULID{uid}
|
||||||
|
|
40
tsdb/db.go
40
tsdb/db.go
|
@ -838,10 +838,13 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
}
|
}
|
||||||
// Set the min valid time for the ingested samples
|
// Set the min valid time for the ingested samples
|
||||||
// to be no lower than the maxt of the last block.
|
// to be no lower than the maxt of the last block.
|
||||||
blocks := db.Blocks()
|
|
||||||
minValidTime := int64(math.MinInt64)
|
minValidTime := int64(math.MinInt64)
|
||||||
if len(blocks) > 0 {
|
// We do not consider blocks created from out-of-order samples for Head's minValidTime
|
||||||
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
|
// since minValidTime is only for the in-order data and we do not want to discard unnecessary
|
||||||
|
// samples from the Head.
|
||||||
|
inOrderMaxTime, ok := db.inOrderBlocksMaxTime()
|
||||||
|
if ok {
|
||||||
|
minValidTime = inOrderMaxTime
|
||||||
}
|
}
|
||||||
|
|
||||||
if initErr := db.head.Init(minValidTime); initErr != nil {
|
if initErr := db.head.Init(minValidTime); initErr != nil {
|
||||||
|
@ -858,7 +861,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
return nil, errors.Wrap(err, "repair corrupted WAL")
|
return nil, errors.Wrap(err, "repair corrupted WAL")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go db.run()
|
go db.run()
|
||||||
|
@ -991,6 +993,7 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.opts.OutOfOrderTimeWindow = oooTimeWindow
|
||||||
db.head.ApplyConfig(conf, wblog)
|
db.head.ApplyConfig(conf, wblog)
|
||||||
|
|
||||||
if !db.oooWasEnabled.Load() {
|
if !db.oooWasEnabled.Load() {
|
||||||
|
@ -1237,10 +1240,11 @@ func (db *DB) reload() error {
|
||||||
if err := db.reloadBlocks(); err != nil {
|
if err := db.reloadBlocks(); err != nil {
|
||||||
return errors.Wrap(err, "reloadBlocks")
|
return errors.Wrap(err, "reloadBlocks")
|
||||||
}
|
}
|
||||||
if len(db.blocks) == 0 {
|
maxt, ok := db.inOrderBlocksMaxTime()
|
||||||
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := db.head.Truncate(db.blocks[len(db.blocks)-1].MaxTime()); err != nil {
|
if err := db.head.Truncate(maxt); err != nil {
|
||||||
return errors.Wrap(err, "head truncate")
|
return errors.Wrap(err, "head truncate")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -1636,6 +1640,30 @@ func (db *DB) Blocks() []*Block {
|
||||||
return db.blocks
|
return db.blocks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inOrderBlocksMaxTime returns the max time among the blocks that were not totally created
|
||||||
|
// out of out-of-order data. If the returned boolean is true, it means there is at least
|
||||||
|
// one such block.
|
||||||
|
func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) {
|
||||||
|
maxt, ok, hasOOO := int64(math.MinInt64), false, false
|
||||||
|
// If blocks are overlapping, last block might not have the max time. So check all blocks.
|
||||||
|
for _, b := range db.Blocks() {
|
||||||
|
hasOOO = hasOOO || b.meta.OutOfOrder
|
||||||
|
if !b.meta.OutOfOrder && b.meta.MaxTime > maxt {
|
||||||
|
ok = true
|
||||||
|
maxt = b.meta.MaxTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasOOO && ok && db.opts.OutOfOrderTimeWindow > 0 {
|
||||||
|
// Temporary patch. To be removed by mid July 2022.
|
||||||
|
// Before this patch, blocks did not have "out_of_order" in their meta, so we cannot
|
||||||
|
// say which block has the out_of_order data. In that case the out-of-order block can be
|
||||||
|
// up to 2 block ranges ahead of the latest in-order block.
|
||||||
|
// Note: if hasOOO was true, it means the latest block has the new meta and is taken care in inOrderBlocksMaxTime().
|
||||||
|
maxt -= 2 * db.opts.MinBlockDuration
|
||||||
|
}
|
||||||
|
return maxt, ok
|
||||||
|
}
|
||||||
|
|
||||||
// Head returns the databases's head.
|
// Head returns the databases's head.
|
||||||
func (db *DB) Head() *Head {
|
func (db *DB) Head() *Head {
|
||||||
return db.head
|
return db.head
|
||||||
|
|
130
tsdb/db_test.go
130
tsdb/db_test.go
|
@ -5192,3 +5192,133 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
|
||||||
require.Nil(t, db.head.wbl)
|
require.Nil(t, db.head.wbl)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
||||||
|
series1 := labels.FromStrings("foo", "bar1")
|
||||||
|
addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool) {
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
for min := fromMins; min <= toMins; min++ {
|
||||||
|
ts := min * time.Minute.Milliseconds()
|
||||||
|
_, err := app.Append(0, series1, ts, float64(ts))
|
||||||
|
if success {
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
verifySamples := func(t *testing.T, db *DB, fromMins, toMins int64) {
|
||||||
|
var expSamples []tsdbutil.Sample
|
||||||
|
for min := fromMins; min <= toMins; min++ {
|
||||||
|
ts := min * time.Minute.Milliseconds()
|
||||||
|
expSamples = append(expSamples, sample{t: ts, v: float64(ts)})
|
||||||
|
}
|
||||||
|
|
||||||
|
expRes := map[string][]tsdbutil.Sample{
|
||||||
|
series1.String(): expSamples,
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||||
|
require.Equal(t, expRes, actRes)
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
inOrderMint, inOrderMaxt int64
|
||||||
|
oooMint, oooMaxt int64
|
||||||
|
// After compaction.
|
||||||
|
blockRanges [][2]int64
|
||||||
|
headMint, headMaxt int64
|
||||||
|
// Head time ranges after restart for old blocks.
|
||||||
|
legacyHeadMint, legacyHeadMaxt int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
300, 490,
|
||||||
|
489, 489,
|
||||||
|
[][2]int64{{300, 360}, {480, 600}},
|
||||||
|
360, 490,
|
||||||
|
360, 490, // OOO blocks is already 2 ranges ahead of the in-order block.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
300, 490,
|
||||||
|
479, 479,
|
||||||
|
[][2]int64{{300, 360}, {360, 480}},
|
||||||
|
360, 490,
|
||||||
|
240, 490, // OOO block was only 1 range ahead of in-order block.
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, c := range cases {
|
||||||
|
// legacy = true means the out-of-order blocks don't have the `out_of_order: true` metadata.
|
||||||
|
for _, legacy := range []bool{false, true} {
|
||||||
|
t.Run(fmt.Sprintf("case=%d,legacy=%t", i, legacy), func(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
opts := DefaultOptions()
|
||||||
|
opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds()
|
||||||
|
|
||||||
|
db, err := Open(dir, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
db.DisableCompactions()
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
// 3h10m=190m worth in-order data.
|
||||||
|
addSamples(t, db, c.inOrderMint, c.inOrderMaxt, true)
|
||||||
|
verifySamples(t, db, c.inOrderMint, c.inOrderMaxt)
|
||||||
|
|
||||||
|
// One ooo samples.
|
||||||
|
addSamples(t, db, c.oooMint, c.oooMaxt, true)
|
||||||
|
verifySamples(t, db, c.inOrderMint, c.inOrderMaxt)
|
||||||
|
|
||||||
|
// We get 2 blocks. 1 from OOO, 1 from in-order.
|
||||||
|
require.NoError(t, db.Compact())
|
||||||
|
verifyBlockRanges := func() {
|
||||||
|
blocks := db.Blocks()
|
||||||
|
require.Equal(t, len(c.blockRanges), len(blocks))
|
||||||
|
for j, br := range c.blockRanges {
|
||||||
|
require.Equal(t, br[0]*time.Minute.Milliseconds(), blocks[j].MinTime())
|
||||||
|
require.Equal(t, br[1]*time.Minute.Milliseconds(), blocks[j].MaxTime())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
verifyBlockRanges()
|
||||||
|
require.Equal(t, c.headMint*time.Minute.Milliseconds(), db.head.MinTime())
|
||||||
|
require.Equal(t, c.headMaxt*time.Minute.Milliseconds(), db.head.MaxTime())
|
||||||
|
|
||||||
|
if legacy {
|
||||||
|
// In the legacy version, the blocks from out-of-order data did not write a
|
||||||
|
// "out_of_order: true" to the meta. So we remove it here.
|
||||||
|
for _, b := range db.Blocks() {
|
||||||
|
m, _, err := readMetaFile(b.Dir())
|
||||||
|
require.NoError(t, err)
|
||||||
|
m.OutOfOrder = false
|
||||||
|
_, err = writeMetaFile(log.NewNopLogger(), b.Dir(), m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart and expect all samples to be present.
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
db, err = Open(dir, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
db.DisableCompactions()
|
||||||
|
|
||||||
|
verifyBlockRanges()
|
||||||
|
if legacy {
|
||||||
|
require.Equal(t, c.legacyHeadMint*time.Minute.Milliseconds(), db.head.MinTime())
|
||||||
|
require.Equal(t, c.legacyHeadMaxt*time.Minute.Milliseconds(), db.head.MaxTime())
|
||||||
|
} else {
|
||||||
|
require.Equal(t, c.headMint*time.Minute.Milliseconds(), db.head.MinTime())
|
||||||
|
require.Equal(t, c.headMaxt*time.Minute.Milliseconds(), db.head.MaxTime())
|
||||||
|
}
|
||||||
|
verifySamples(t, db, c.inOrderMint, c.inOrderMaxt)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -464,9 +464,9 @@ func (wp *walSubsetProcessor) waitUntilIdle() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
||||||
// Track number of samples that referenced a series we don't know about
|
// Track number of samples, m-map markers, that referenced a series we don't know about
|
||||||
// for error reporting.
|
// for error reporting.
|
||||||
var unknownRefs atomic.Uint64
|
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
|
||||||
|
|
||||||
lastSeq, lastOff := lastMmapRef.Unpack()
|
lastSeq, lastOff := lastMmapRef.Unpack()
|
||||||
// Start workers that each process samples for a partition of the series ID space.
|
// Start workers that each process samples for a partition of the series ID space.
|
||||||
|
@ -593,9 +593,13 @@ func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r, ok := multiRef[rm.Ref]; ok {
|
||||||
|
rm.Ref = r
|
||||||
|
}
|
||||||
|
|
||||||
ms := h.series.getByID(rm.Ref)
|
ms := h.series.getByID(rm.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
unknownRefs.Inc()
|
mmapMarkerUnknownRefs.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -635,8 +639,8 @@ func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
||||||
return errors.Wrap(r.Err(), "read records")
|
return errors.Wrap(r.Err(), "read records")
|
||||||
}
|
}
|
||||||
|
|
||||||
if unknownRefs.Load() > 0 {
|
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
|
||||||
level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load())
|
level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue