diff --git a/tsdb/block.go b/tsdb/block.go index 399a2eed1f..3c820fc626 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -169,6 +169,9 @@ type BlockMeta struct { // Version of the index format. Version int `json:"version"` + + // OutOfOrder is true if the block was directly created from out-of-order samples. + OutOfOrder bool `json:"out_of_order"` } // BlockStats contains stats about contents of a block. diff --git a/tsdb/compact.go b/tsdb/compact.go index 80d3f62536..2db02c340e 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -608,9 +608,10 @@ func (c *LeveledCompactor) compactOOO(dest string, oooHead *OOOCompactionHead, s for jx := range outBlocks[ix] { uid := ulid.MustNew(outBlocksTime, rand.Reader) meta := &BlockMeta{ - ULID: uid, - MinTime: mint, - MaxTime: maxt, + ULID: uid, + MinTime: mint, + MaxTime: maxt, + OutOfOrder: true, } meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} diff --git a/tsdb/db.go b/tsdb/db.go index 305559e222..2670118bea 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -838,10 +838,13 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } // Set the min valid time for the ingested samples // to be no lower than the maxt of the last block. - blocks := db.Blocks() minValidTime := int64(math.MinInt64) - if len(blocks) > 0 { - minValidTime = blocks[len(blocks)-1].Meta().MaxTime + // We do not consider blocks created from out-of-order samples for Head's minValidTime + // since minValidTime is only for the in-order data and we do not want to discard unnecessary + // samples from the Head. + inOrderMaxTime, ok := db.inOrderBlocksMaxTime() + if ok { + minValidTime = inOrderMaxTime } if initErr := db.head.Init(minValidTime); initErr != nil { @@ -858,7 +861,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs return nil, errors.Wrap(err, "repair corrupted WAL") } } - } go db.run() @@ -991,6 +993,7 @@ func (db *DB) ApplyConfig(conf *config.Config) error { } } + db.opts.OutOfOrderTimeWindow = oooTimeWindow db.head.ApplyConfig(conf, wblog) if !db.oooWasEnabled.Load() { @@ -1237,10 +1240,11 @@ func (db *DB) reload() error { if err := db.reloadBlocks(); err != nil { return errors.Wrap(err, "reloadBlocks") } - if len(db.blocks) == 0 { + maxt, ok := db.inOrderBlocksMaxTime() + if !ok { return nil } - if err := db.head.Truncate(db.blocks[len(db.blocks)-1].MaxTime()); err != nil { + if err := db.head.Truncate(maxt); err != nil { return errors.Wrap(err, "head truncate") } return nil @@ -1636,6 +1640,30 @@ func (db *DB) Blocks() []*Block { return db.blocks } +// inOrderBlocksMaxTime returns the max time among the blocks that were not totally created +// out of out-of-order data. If the returned boolean is true, it means there is at least +// one such block. +func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) { + maxt, ok, hasOOO := int64(math.MinInt64), false, false + // If blocks are overlapping, last block might not have the max time. So check all blocks. + for _, b := range db.Blocks() { + hasOOO = hasOOO || b.meta.OutOfOrder + if !b.meta.OutOfOrder && b.meta.MaxTime > maxt { + ok = true + maxt = b.meta.MaxTime + } + } + if !hasOOO && ok && db.opts.OutOfOrderTimeWindow > 0 { + // Temporary patch. To be removed by mid July 2022. + // Before this patch, blocks did not have "out_of_order" in their meta, so we cannot + // say which block has the out_of_order data. In that case the out-of-order block can be + // up to 2 block ranges ahead of the latest in-order block. + // Note: if hasOOO was true, it means the latest block has the new meta and is taken care in inOrderBlocksMaxTime(). + maxt -= 2 * db.opts.MinBlockDuration + } + return maxt, ok +} + // Head returns the databases's head. func (db *DB) Head() *Head { return db.head diff --git a/tsdb/db_test.go b/tsdb/db_test.go index a629843b45..20dea2642b 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5192,3 +5192,133 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) { require.Nil(t, db.head.wbl) }) } + +func TestNoGapAfterRestartWithOOO(t *testing.T) { + series1 := labels.FromStrings("foo", "bar1") + addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool) { + app := db.Appender(context.Background()) + for min := fromMins; min <= toMins; min++ { + ts := min * time.Minute.Milliseconds() + _, err := app.Append(0, series1, ts, float64(ts)) + if success { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + require.NoError(t, app.Commit()) + } + + verifySamples := func(t *testing.T, db *DB, fromMins, toMins int64) { + var expSamples []tsdbutil.Sample + for min := fromMins; min <= toMins; min++ { + ts := min * time.Minute.Milliseconds() + expSamples = append(expSamples, sample{t: ts, v: float64(ts)}) + } + + expRes := map[string][]tsdbutil.Sample{ + series1.String(): expSamples, + } + + q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + require.Equal(t, expRes, actRes) + } + + cases := []struct { + inOrderMint, inOrderMaxt int64 + oooMint, oooMaxt int64 + // After compaction. + blockRanges [][2]int64 + headMint, headMaxt int64 + // Head time ranges after restart for old blocks. + legacyHeadMint, legacyHeadMaxt int64 + }{ + { + 300, 490, + 489, 489, + [][2]int64{{300, 360}, {480, 600}}, + 360, 490, + 360, 490, // OOO blocks is already 2 ranges ahead of the in-order block. + }, + { + 300, 490, + 479, 479, + [][2]int64{{300, 360}, {360, 480}}, + 360, 490, + 240, 490, // OOO block was only 1 range ahead of in-order block. + }, + } + + for i, c := range cases { + // legacy = true means the out-of-order blocks don't have the `out_of_order: true` metadata. + for _, legacy := range []bool{false, true} { + t.Run(fmt.Sprintf("case=%d,legacy=%t", i, legacy), func(t *testing.T) { + dir := t.TempDir() + + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds() + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // 3h10m=190m worth in-order data. + addSamples(t, db, c.inOrderMint, c.inOrderMaxt, true) + verifySamples(t, db, c.inOrderMint, c.inOrderMaxt) + + // One ooo samples. + addSamples(t, db, c.oooMint, c.oooMaxt, true) + verifySamples(t, db, c.inOrderMint, c.inOrderMaxt) + + // We get 2 blocks. 1 from OOO, 1 from in-order. + require.NoError(t, db.Compact()) + verifyBlockRanges := func() { + blocks := db.Blocks() + require.Equal(t, len(c.blockRanges), len(blocks)) + for j, br := range c.blockRanges { + require.Equal(t, br[0]*time.Minute.Milliseconds(), blocks[j].MinTime()) + require.Equal(t, br[1]*time.Minute.Milliseconds(), blocks[j].MaxTime()) + } + } + verifyBlockRanges() + require.Equal(t, c.headMint*time.Minute.Milliseconds(), db.head.MinTime()) + require.Equal(t, c.headMaxt*time.Minute.Milliseconds(), db.head.MaxTime()) + + if legacy { + // In the legacy version, the blocks from out-of-order data did not write a + // "out_of_order: true" to the meta. So we remove it here. + for _, b := range db.Blocks() { + m, _, err := readMetaFile(b.Dir()) + require.NoError(t, err) + m.OutOfOrder = false + _, err = writeMetaFile(log.NewNopLogger(), b.Dir(), m) + require.NoError(t, err) + } + } + + // Restart and expect all samples to be present. + require.NoError(t, db.Close()) + + db, err = Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() + + verifyBlockRanges() + if legacy { + require.Equal(t, c.legacyHeadMint*time.Minute.Milliseconds(), db.head.MinTime()) + require.Equal(t, c.legacyHeadMaxt*time.Minute.Milliseconds(), db.head.MaxTime()) + } else { + require.Equal(t, c.headMint*time.Minute.Milliseconds(), db.head.MinTime()) + require.Equal(t, c.headMaxt*time.Minute.Milliseconds(), db.head.MaxTime()) + } + verifySamples(t, db, c.inOrderMint, c.inOrderMaxt) + }) + } + } +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index e92bcc8b88..6b6e835769 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -464,9 +464,9 @@ func (wp *walSubsetProcessor) waitUntilIdle() { } func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { - // Track number of samples that referenced a series we don't know about + // Track number of samples, m-map markers, that referenced a series we don't know about // for error reporting. - var unknownRefs atomic.Uint64 + var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -593,9 +593,13 @@ func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H continue } + if r, ok := multiRef[rm.Ref]; ok { + rm.Ref = r + } + ms := h.series.getByID(rm.Ref) if ms == nil { - unknownRefs.Inc() + mmapMarkerUnknownRefs.Inc() continue } @@ -635,8 +639,8 @@ func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return errors.Wrap(r.Err(), "read records") } - if unknownRefs.Load() > 0 { - level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load()) + if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 { + level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load()) } return nil }