React UI: Add Starting Screen (#8662)

* Added walreplay API endpoint

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Added starting page to react-ui

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Documented the new endpoint

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Fixed typos

Signed-off-by: Levi Harrison <git@leviharrison.dev>

Co-authored-by: Julius Volz <julius.volz@gmail.com>

* Removed logo

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed isResponding to isUnexpected

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed width of progress bar

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed width of progress bar

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Added DB stats object

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Updated starting page to work with new fields

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Passing nil

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Passing nil (pt. 2)

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Passing nil (pt. 3)

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Passing nil (and also implementing a method this time) (pt. 4)

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Passing nil (and also implementing a method this time) (pt. 5)

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed const to let

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Passing nil (pt. 6)

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Remove SetStats method

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Added comma

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed api

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed to triple equals

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Fixed data response types

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Don't return pointer

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Changed version

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Fixed interface issue

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Fixed pointer

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Fixed copying lock value error

Signed-off-by: Levi Harrison <git@leviharrison.dev>

Co-authored-by: Julius Volz <julius.volz@gmail.com>
This commit is contained in:
Levi Harrison 2021-06-05 10:29:32 -04:00 committed by GitHub
parent b781b5cac5
commit 7bc11dcb06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 395 additions and 72 deletions

View file

@ -434,7 +434,7 @@ func main() {
level.Info(logger).Log("vm_limits", prom_runtime.VMLimits())
var (
localStorage = &readyStorage{}
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
@ -815,11 +815,13 @@ func main() {
return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
}
}
db, err := openDBWithMetrics(
cfg.localStoragePath,
logger,
prometheus.DefaultRegisterer,
&opts,
localStorage.getStats(),
)
if err != nil {
return errors.Wrapf(err, "opening storage failed")
@ -901,12 +903,13 @@ func main() {
level.Info(logger).Log("msg", "See you next time!")
}
func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options) (*tsdb.DB, error) {
func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options, stats *tsdb.DBStats) (*tsdb.DB, error) {
db, err := tsdb.Open(
dir,
log.With(logger, "component", "tsdb"),
reg,
opts,
stats,
)
if err != nil {
return nil, err
@ -1076,6 +1079,7 @@ type readyStorage struct {
mtx sync.RWMutex
db *tsdb.DB
startTimeMargin int64
stats *tsdb.DBStats
}
// Set the storage.
@ -1087,7 +1091,6 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.startTimeMargin = startTimeMargin
}
// get is internal, you should use readyStorage as the front implementation layer.
func (s *readyStorage) get() *tsdb.DB {
s.mtx.RLock()
x := s.db
@ -1095,6 +1098,13 @@ func (s *readyStorage) get() *tsdb.DB {
return x
}
func (s *readyStorage) getStats() *tsdb.DBStats {
s.mtx.RLock()
x := s.stats
s.mtx.RUnlock()
return x
}
// StartTime implements the Storage interface.
func (s *readyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil {
@ -1197,6 +1207,14 @@ func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) {
return nil, tsdb.ErrNotReady
}
// WALReplayStatus implements the api_v1.TSDBStats interface.
func (s *readyStorage) WALReplayStatus() (tsdb.WALReplayStatus, error) {
if x := s.getStats(); x != nil {
return x.Head.WALReplayStatus.GetWALReplayStatus(), nil
}
return tsdb.WALReplayStatus{}, tsdb.ErrNotReady
}
// ErrNotReady is returned if the underlying scrape manager is not ready yet.
var ErrNotReady = errors.New("Scrape manager not ready")

View file

@ -289,7 +289,7 @@ func TestTimeMetrics(t *testing.T) {
}()
reg := prometheus.NewRegistry()
db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil)
db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())

View file

@ -548,7 +548,7 @@ after_eof 1 2
}
require.NoError(t, err)
db, err := tsdb.Open(outputDir, nil, nil, tsdb.DefaultOptions())
db, err := tsdb.Open(outputDir, nil, nil, tsdb.DefaultOptions(), nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())

View file

@ -115,7 +115,7 @@ func TestBackfillRuleIntegration(t *testing.T) {
opts := tsdb.DefaultOptions()
opts.AllowOverlappingBlocks = true
db, err := tsdb.Open(tmpDir, nil, nil, opts)
db, err := tsdb.Open(tmpDir, nil, nil, opts, nil)
require.NoError(t, err)
blocks := db.Blocks()

View file

@ -87,7 +87,7 @@ func benchmarkWrite(outPath, samplesFile string, numMetrics, numScrapes int) err
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: int64(2 * time.Hour / time.Millisecond),
})
}, tsdb.NewDBStats())
if err != nil {
return err
}

View file

@ -1033,6 +1033,39 @@ $ curl http://localhost:9090/api/v1/status/tsdb
*New in v2.15*
### WAL Replay Stats
The following endpoint returns information about the WAL replay:
```
GET /api/v1/status/walreplay
```
**read**: The number of segments replayed so far.
**total**: The total number segments needed to be replayed.
**progress**: The progress of the replay (0 - 100%).
**state**: The state of the replay. Possible states:
- **waiting**: Waiting for the replay to start.
- **in progress**: The replay is in progress.
- **done**: The replay has finished.
```json
$ curl http://localhost:9090/api/v1/status/walreplay
{
"status": "success",
"data": {
"min": 2,
"max": 5,
"current": 40,
"state": "in progress"
}
}
```
NOTE: This endpoint is available before the server has been marked ready and is updated in real time to facilitate monitoring the progress of the WAL replay.
*New in v2.28*
## TSDB Admin APIs
These are APIs that expose database functionalities for the advanced user. These APIs are not enabled unless the `--web.enable-admin-api` is set.

View file

@ -441,7 +441,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head {
opts := DefaultHeadOptions()
opts.ChunkDirRoot = chunkDir
head, err := NewHead(nil, nil, w, opts)
head, err := NewHead(nil, nil, w, opts, nil)
require.NoError(tb, err)
app := head.Appender(context.Background())

View file

@ -72,7 +72,7 @@ func (w *BlockWriter) initHead() error {
opts := DefaultHeadOptions()
opts.ChunkRange = w.blockSize
opts.ChunkDirRoot = w.chunkDir
h, err := NewHead(nil, w.logger, nil, opts)
h, err := NewHead(nil, w.logger, nil, opts, NewHeadStats())
if err != nil {
return errors.Wrap(err, "tsdb.NewHead")
}

View file

@ -1096,7 +1096,7 @@ func BenchmarkCompactionFromHead(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender(context.Background())
@ -1196,7 +1196,7 @@ func TestCancelCompactions(t *testing.T) {
// Measure the compaction time without interrupting it.
var timeCompactionUninterrupted time.Duration
{
db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000})
db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
require.NoError(t, err)
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
@ -1216,7 +1216,7 @@ func TestCancelCompactions(t *testing.T) {
}
// Measure the compaction time when closing the db in the middle of compaction.
{
db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000})
db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
require.NoError(t, err)
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")

View file

@ -297,6 +297,20 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
return m
}
// DBStats contains statistics about the DB seperated by component (eg. head).
// They are available before the DB has finished initializing.
type DBStats struct {
Head *HeadStats
}
// NewDBStats returns a new DBStats object initialized using the
// the new function from each component.
func NewDBStats() *DBStats {
return &DBStats{
Head: NewHeadStats(),
}
}
// ErrClosed is returned when the db is closed.
var ErrClosed = errors.New("db already closed")
@ -346,7 +360,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
}
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
head, err := NewHead(nil, db.logger, w, opts)
head, err := NewHead(nil, db.logger, w, opts, NewHeadStats())
if err != nil {
return err
}
@ -402,7 +416,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
head, err := NewHead(nil, db.logger, nil, opts)
head, err := NewHead(nil, db.logger, nil, opts, NewHeadStats())
if err != nil {
return nil, err
}
@ -422,7 +436,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir
head, err = NewHead(nil, db.logger, w, opts)
head, err = NewHead(nil, db.logger, w, opts, NewHeadStats())
if err != nil {
return nil, err
}
@ -541,10 +555,11 @@ func (db *DBReadOnly) Close() error {
}
// Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error) {
var rngs []int64
opts, rngs = validateOpts(opts, nil)
return open(dir, l, r, opts, rngs)
return open(dir, l, r, opts, rngs, stats)
}
func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
@ -575,13 +590,16 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
return opts, rngs
}
func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64) (_ *DB, returnedErr error) {
func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
if l == nil {
l = log.NewNopLogger()
}
if stats == nil {
stats = NewDBStats()
}
for i, v := range rngs {
if v > opts.MaxBlockDuration {
@ -678,7 +696,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.NumExemplars = opts.MaxExemplars
db.head, err = NewHead(r, l, wlog, headOpts)
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
if err != nil {
return nil, err
}

View file

@ -62,10 +62,10 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
require.NoError(t, err)
if len(rngs) == 0 {
db, err = Open(tmpdir, nil, nil, opts)
db, err = Open(tmpdir, nil, nil, opts, nil)
} else {
opts, rngs = validateOpts(opts, rngs)
db, err = open(tmpdir, nil, nil, opts, rngs)
db, err = open(tmpdir, nil, nil, opts, rngs, nil)
}
require.NoError(t, err)
@ -241,7 +241,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) {
// Query the data.
{
db, err := Open(db.Dir(), nil, nil, nil)
db, err := Open(db.Dir(), nil, nil, nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
@ -581,7 +581,7 @@ func TestDB_Snapshot(t *testing.T) {
require.NoError(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
db, err = Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
@ -633,7 +633,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
require.NoError(t, db.Close())
// Reopen DB from snapshot.
db, err = Open(snap, nil, nil, nil)
db, err = Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
@ -703,7 +703,7 @@ Outer:
require.NoError(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
db, err = Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
@ -923,7 +923,7 @@ func TestWALFlushedOnDBClose(t *testing.T) {
require.NoError(t, db.Close())
db, err = Open(dirDb, nil, nil, nil)
db, err = Open(dirDb, nil, nil, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
@ -1046,7 +1046,7 @@ func TestTombstoneClean(t *testing.T) {
require.NoError(t, db.Close())
// Reopen DB from snapshot.
db, err = Open(snap, nil, nil, nil)
db, err = Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -1135,7 +1135,7 @@ func TestTombstoneCleanResultEmptyBlock(t *testing.T) {
require.NoError(t, db.Close())
// Reopen DB from snapshot.
db, err = Open(snap, nil, nil, nil)
db, err = Open(snap, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -1757,7 +1757,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
db, err := Open(dir, nil, nil, nil)
db, err := Open(dir, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -1799,7 +1799,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
require.NoError(t, err)
require.NoError(t, w.Close())
db, err := Open(dir, nil, nil, nil)
db, err := Open(dir, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -1815,7 +1815,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 2000))
db, err := Open(dir, nil, nil, nil)
db, err := Open(dir, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -1851,7 +1851,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
r := prometheus.NewRegistry()
db, err := Open(dir, nil, r, nil)
db, err := Open(dir, nil, r, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -2125,7 +2125,7 @@ func TestBlockRanges(t *testing.T) {
// when a non standard block already exists.
firstBlockMaxT := int64(3)
createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000})
db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil)
require.NoError(t, err)
rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1
@ -2177,7 +2177,7 @@ func TestBlockRanges(t *testing.T) {
thirdBlockMaxt := secondBlockMaxt + 2
createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt))
db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000})
db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil)
require.NoError(t, err)
defer db.Close()
@ -2245,7 +2245,7 @@ func TestDBReadOnly(t *testing.T) {
// Open a normal db to use for a comparison.
{
dbWritable, err := Open(dbDir, logger, nil, nil)
dbWritable, err := Open(dbDir, logger, nil, nil, nil)
require.NoError(t, err)
dbWritable.DisableCompactions()
@ -2347,7 +2347,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
}()
// Append data to the WAL.
db, err := Open(dbDir, logger, nil, nil)
db, err := Open(dbDir, logger, nil, nil, nil)
require.NoError(t, err)
db.DisableCompactions()
app := db.Appender(ctx)
@ -2408,7 +2408,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
require.NoError(t, os.RemoveAll(tmpdir))
}()
db, err := Open(tmpdir, nil, nil, nil)
db, err := Open(tmpdir, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -2478,7 +2478,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
require.NoError(t, os.RemoveAll(tmpdir))
}()
db, err := Open(tmpdir, nil, nil, nil)
db, err := Open(tmpdir, nil, nil, nil, nil)
require.NoError(t, err)
defer db.Close()
@ -2805,7 +2805,7 @@ func TestCompactHead(t *testing.T) {
WALCompression: true,
}
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
require.NoError(t, err)
ctx := context.Background()
app := db.Appender(ctx)
@ -2826,7 +2826,7 @@ func TestCompactHead(t *testing.T) {
// Delete everything but the new block and
// reopen the db to query it to ensure it includes the head data.
require.NoError(t, deleteNonBlocks(db.Dir()))
db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
require.NoError(t, err)
require.Equal(t, 1, len(db.Blocks()))
require.Equal(t, int64(maxt), db.Head().MinTime())
@ -2955,7 +2955,7 @@ func TestOpen_VariousBlockStates(t *testing.T) {
opts := DefaultOptions()
opts.RetentionDuration = 0
db, err := Open(tmpDir, log.NewLogfmtLogger(os.Stderr), nil, opts)
db, err := Open(tmpDir, log.NewLogfmtLogger(os.Stderr), nil, opts, nil)
require.NoError(t, err)
loadedBlocks := db.Blocks()
@ -3000,7 +3000,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.NoError(t, os.RemoveAll(tmpDir))
})
db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
@ -3061,7 +3061,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
createBlock(t, db.dir, genSeries(1, 1, newBlockMint, newBlockMaxt))
db, err = Open(db.dir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
db, err = Open(db.dir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
require.NoError(t, err)
db.DisableCompactions()
@ -3119,7 +3119,7 @@ func TestNoPanicOnTSDBOpenError(t *testing.T) {
lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock"))
require.NoError(t, err)
_, err = Open(tmpdir, nil, nil, DefaultOptions())
_, err = Open(tmpdir, nil, nil, DefaultOptions(), nil)
require.Error(t, err)
require.NoError(t, lockf.Release())

View file

@ -105,6 +105,8 @@ type Head struct {
closedMtx sync.Mutex
closed bool
stats *HeadStats
}
// HeadOptions are parameters for the Head block.
@ -307,6 +309,38 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
return m
}
// HeadStats are the statistics for the head component of the DB.
type HeadStats struct {
WALReplayStatus *WALReplayStatus
}
// NewHeadStats returns a new HeadStats object.
func NewHeadStats() *HeadStats {
return &HeadStats{
WALReplayStatus: &WALReplayStatus{},
}
}
// WALReplayStatus contains status information about the WAL replay.
type WALReplayStatus struct {
sync.RWMutex
Min int
Max int
Current int
}
// GetWALReplayStatus returns the WAL replay status information.
func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus {
s.RLock()
defer s.RUnlock()
return WALReplayStatus{
Min: s.Min,
Max: s.Max,
Current: s.Current,
}
}
const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
@ -328,7 +362,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
}
// NewHead opens the head block in dir.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions) (*Head, error) {
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
@ -344,6 +378,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
return nil, err
}
if stats == nil {
stats = NewHeadStats()
}
h := &Head{
wal: wal,
logger: l,
@ -360,6 +398,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
return &memChunk{}
},
},
stats: stats,
}
h.chunkRange.Store(opts.ChunkRange)
h.minTime.Store(math.MaxInt64)
@ -795,6 +834,8 @@ func (h *Head) Init(minValidTime int64) error {
return errors.Wrap(err, "finding WAL segments")
}
h.startWALReplayStatus(startFrom, last)
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
@ -811,6 +852,7 @@ func (h *Head) Init(minValidTime int64) error {
return err
}
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
h.updateWALReplayStatusRead(i)
}
walReplayDuration := time.Since(start)
@ -2701,3 +2743,19 @@ func (h *Head) Size() int64 {
func (h *RangeHead) Size() int64 {
return h.head.Size()
}
func (h *Head) startWALReplayStatus(startFrom, last int) {
h.stats.WALReplayStatus.Lock()
defer h.stats.WALReplayStatus.Unlock()
h.stats.WALReplayStatus.Min = startFrom
h.stats.WALReplayStatus.Max = last
h.stats.WALReplayStatus.Current = startFrom
}
func (h *Head) updateWALReplayStatusRead(current int) {
h.stats.WALReplayStatus.Lock()
defer h.stats.WALReplayStatus.Unlock()
h.stats.WALReplayStatus.Current = current
}

View file

@ -36,7 +36,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
@ -55,7 +55,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
@ -83,7 +83,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
// Mock the PreCreation() callback to fail on each series.
opts.SeriesCallback = failingSeriesLifecycleCallback{}
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()

View file

@ -52,7 +52,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
opts.ChunkRange = chunkRange
opts.ChunkDirRoot = dir
opts.NumExemplars = 10
h, err := NewHead(nil, nil, wlog, opts)
h, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
@ -230,7 +230,7 @@ func BenchmarkLoadWAL(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
h, err := NewHead(nil, nil, w, opts)
h, err := NewHead(nil, nil, w, opts, nil)
require.NoError(b, err)
h.Init(0)
}
@ -354,7 +354,7 @@ func TestHead_WALMultiRef(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
head, err = NewHead(nil, nil, w, opts)
head, err = NewHead(nil, nil, w, opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
defer func() {
@ -638,7 +638,7 @@ func TestHeadDeleteSimple(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = reloadedW.Dir()
reloadedHead, err := NewHead(nil, nil, reloadedW, opts)
reloadedHead, err := NewHead(nil, nil, reloadedW, opts, nil)
require.NoError(t, err)
require.NoError(t, reloadedHead.Init(0))
@ -1323,7 +1323,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1
opts.ChunkDirRoot = w.Dir()
h, err := NewHead(nil, nil, w, opts)
h, err := NewHead(nil, nil, w, opts, nil)
require.NoError(t, err)
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
@ -1336,7 +1336,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
// Open the db to trigger a repair.
{
db, err := Open(dir, nil, nil, DefaultOptions())
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
@ -1381,7 +1381,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = chunkRange
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, w, opts)
h, err := NewHead(nil, nil, w, opts, nil)
require.NoError(t, err)
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
require.NoError(t, h.Init(math.MinInt64))
@ -1416,7 +1416,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
// Open the db to trigger a repair.
{
db, err := Open(dir, nil, nil, DefaultOptions())
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
@ -1614,7 +1614,7 @@ func TestMemSeriesIsolation(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = wlog.Dir()
hb, err = NewHead(nil, nil, wlog, opts)
hb, err = NewHead(nil, nil, wlog, opts, nil)
defer func() { require.NoError(t, hb.Close()) }()
require.NoError(t, err)
require.NoError(t, hb.Init(0))
@ -1758,7 +1758,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
db, err := Open(dir, nil, nil, DefaultOptions())
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())

View file

@ -40,7 +40,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(b, err)
defer func() {
require.NoError(b, h.Close())
@ -152,7 +152,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
app := h.Appender(context.Background())

View file

@ -409,7 +409,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
t.Run("", func(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 2 * time.Hour.Milliseconds()
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(t, err)
defer h.Close()
@ -1579,7 +1579,7 @@ func TestPostingsForMatchers(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
h, err := NewHead(nil, nil, nil, opts)
h, err := NewHead(nil, nil, nil, opts, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, h.Close())
@ -1844,7 +1844,7 @@ func TestClose(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 0, 10))
createBlock(t, dir, genSeries(1, 1, 10, 20))
db, err := Open(dir, nil, nil, DefaultOptions())
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}

View file

@ -95,7 +95,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
require.NoError(t, r.Close())
// On DB opening all blocks in the base dir should be repaired.
db, err := Open(tmpDir, nil, nil, nil)
db, err := Open(tmpDir, nil, nil, nil, nil)
require.NoError(t, err)
db.Close()

View file

@ -39,7 +39,7 @@ func New(t testutil.T) *TestStorage {
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxExemplars = 10
db, err := tsdb.Open(dir, nil, nil, opts)
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}

View file

@ -154,6 +154,7 @@ type TSDBAdminStats interface {
Snapshot(dir string, withHead bool) error
Stats(statsByLabelName string) (*tsdb.Stats, error)
WALReplayStatus() (tsdb.WALReplayStatus, error)
}
// API can register a set of endpoints in a router and handle
@ -309,6 +310,7 @@ func (api *API) Register(r *route.Router) {
r.Get("/status/buildinfo", wrap(api.serveBuildInfo))
r.Get("/status/flags", wrap(api.serveFlags))
r.Get("/status/tsdb", wrap(api.serveTSDBStatus))
r.Get("/status/walreplay", api.serveWALReplayStatus)
r.Post("/read", api.ready(http.HandlerFunc(api.remoteRead)))
r.Post("/write", api.ready(http.HandlerFunc(api.remoteWrite)))
@ -1351,6 +1353,25 @@ func (api *API) serveTSDBStatus(*http.Request) apiFuncResult {
}, nil, nil, nil}
}
type walReplayStatus struct {
Min int `json:"min"`
Max int `json:"max"`
Current int `json:"current"`
}
func (api *API) serveWALReplayStatus(w http.ResponseWriter, r *http.Request) {
httputil.SetCORS(w, api.CORSOrigin, r)
status, err := api.db.WALReplayStatus()
if err != nil {
api.respondError(w, &apiError{errorInternal, err}, nil)
}
api.respond(w, walReplayStatus{
Min: status.Min,
Max: status.Max,
Current: status.Current,
}, nil)
}
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
// This is only really for tests - this will never be nil IRL.
if api.remoteReadHandler != nil {

View file

@ -2115,9 +2115,12 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) {
}()
opts := tsdb.DefaultHeadOptions()
opts.ChunkRange = 1000
h, _ := tsdb.NewHead(nil, nil, nil, opts)
h, _ := tsdb.NewHead(nil, nil, nil, opts, nil)
return h.Stats(statsByLabelName), nil
}
func (f *fakeDB) WALReplayStatus() (tsdb.WALReplayStatus, error) {
return tsdb.WALReplayStatus{}, nil
}
func TestAdminEndpoints(t *testing.T) {
tsdb, tsdbWithError, tsdbNotReady := &fakeDB{}, &fakeDB{err: errors.New("some error")}, &fakeDB{err: errors.Wrap(tsdb.ErrNotReady, "wrap")}

View file

@ -2,13 +2,15 @@ import React, { FC } from 'react';
import Navigation from './Navbar';
import { Container } from 'reactstrap';
import { Router, Redirect } from '@reach/router';
import { Router, Redirect, navigate } from '@reach/router';
import useMedia from 'use-media';
import { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList } from './pages';
import { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList, Starting } from './pages';
import { PathPrefixContext } from './contexts/PathPrefixContext';
import { ThemeContext, themeName, themeSetting } from './contexts/ThemeContext';
import { Theme, themeLocalStorageKey } from './Theme';
import { useLocalStorage } from './hooks/useLocalStorage';
import { useFetchReady } from './hooks/useFetch';
import { usePathPrefix } from './contexts/PathPrefixContext';
interface AppProps {
consolesLink: string | null;
@ -29,6 +31,7 @@ const App: FC<AppProps> = ({ consolesLink }) => {
'/rules',
'/targets',
'/service-discovery',
'/starting',
];
if (basePath.endsWith('/')) {
basePath = basePath.slice(0, -1);
@ -42,6 +45,14 @@ const App: FC<AppProps> = ({ consolesLink }) => {
}
}
const pathPrefix = usePathPrefix();
const { ready, isLoading, isUnexpected } = useFetchReady(pathPrefix);
if (basePath !== '/starting') {
if (!ready && !isLoading && !isUnexpected) {
navigate('/starting');
}
}
const [userTheme, setUserTheme] = useLocalStorage<themeSetting>(themeLocalStorageKey, 'auto');
const browserHasThemes = useMedia('(prefers-color-scheme)');
const browserWantsDarkTheme = useMedia('(prefers-color-scheme: dark)');
@ -76,6 +87,7 @@ const App: FC<AppProps> = ({ consolesLink }) => {
<Status path="/status" />
<TSDBStatus path="/tsdb-status" />
<Targets path="/targets" />
<Starting path="/starting" />
</Router>
</Container>
</PathPrefixContext.Provider>

View file

@ -1,4 +1,6 @@
import { useState, useEffect } from 'react';
import { API_PATH } from '../constants/constants';
import { WALReplayStatus } from '../types/types';
export type APIResponse<T> = { status: string; data: T };
@ -8,6 +10,18 @@ export interface FetchState<T> {
isLoading: boolean;
}
export interface FetchStateReady {
ready: boolean;
isUnexpected: boolean;
isLoading: boolean;
}
export interface FetchStateReadyInterval {
ready: boolean;
isUnexpected: boolean;
walReplayStatus: WALReplayStatus;
}
export const useFetch = <T extends {}>(url: string, options?: RequestInit): FetchState<T> => {
const [response, setResponse] = useState<APIResponse<T>>({ status: 'start fetching' } as any);
const [error, setError] = useState<Error>();
@ -32,3 +46,73 @@ export const useFetch = <T extends {}>(url: string, options?: RequestInit): Fetc
}, [url, options]);
return { response, error, isLoading };
};
export const useFetchReady = (pathPrefix: string, options?: RequestInit): FetchStateReady => {
const [ready, setReady] = useState<boolean>(false);
const [isUnexpected, setIsUnexpected] = useState<boolean>(false);
const [isLoading, setIsLoading] = useState<boolean>(true);
useEffect(() => {
const fetchData = async () => {
setIsLoading(true);
try {
const res = await fetch(`${pathPrefix}/-/ready`, { cache: 'no-store', credentials: 'same-origin', ...options });
if (res.status === 200) {
setReady(true);
}
// The server sends back a 503 if it isn't ready,
// if we get back anything else that means something has gone wrong.
if (res.status !== 503) {
setIsUnexpected(true);
} else {
setIsUnexpected(false);
}
setIsLoading(false);
} catch (error) {
setIsUnexpected(true);
}
};
fetchData();
}, [pathPrefix, options]);
return { ready, isUnexpected, isLoading };
};
// This is used on the starting page to periodically check if the server is ready yet,
// and check the status of the WAL replay.
export const useFetchReadyInterval = (pathPrefix: string, options?: RequestInit): FetchStateReadyInterval => {
const [ready, setReady] = useState<boolean>(false);
const [isUnexpected, setIsUnexpected] = useState<boolean>(false);
const [walReplayStatus, setWALReplayStatus] = useState<WALReplayStatus>({} as any);
useEffect(() => {
const interval = setInterval(async () => {
try {
let res = await fetch(`${pathPrefix}/-/ready`, { cache: 'no-store', credentials: 'same-origin', ...options });
if (res.status === 200) {
setReady(true);
clearInterval(interval);
return;
}
if (res.status !== 503) {
setIsUnexpected(true);
setWALReplayStatus({ data: { last: 0, first: 0 } } as any);
} else {
setIsUnexpected(false);
res = await fetch(`${pathPrefix}/${API_PATH}/status/walreplay`, { cache: 'no-store', credentials: 'same-origin' });
if (res.ok) {
const data = (await res.json()) as WALReplayStatus;
setWALReplayStatus(data);
}
}
} catch (error) {
setIsUnexpected(true);
setWALReplayStatus({ data: { last: 0, first: 0 } } as any);
}
}, 1000);
return () => clearInterval(interval);
}, [pathPrefix, options]);
return { ready, isUnexpected, walReplayStatus };
};

View file

@ -7,5 +7,6 @@ import Status from './status/Status';
import Targets from './targets/Targets';
import PanelList from './graph/PanelList';
import TSDBStatus from './tsdbStatus/TSDBStatus';
import Starting from './starting/Starting';
export { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList };
export { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList, Starting };

View file

@ -0,0 +1,60 @@
import React, { FC, useEffect } from 'react';
import { RouteComponentProps, navigate } from '@reach/router';
import { Progress, Alert } from 'reactstrap';
import { useFetchReadyInterval } from '../../hooks/useFetch';
import { WALReplayData } from '../../types/types';
import { usePathPrefix } from '../../contexts/PathPrefixContext';
interface StartingContentProps {
isUnexpected: boolean;
status?: WALReplayData;
}
export const StartingContent: FC<StartingContentProps> = ({ status, isUnexpected }) => {
if (isUnexpected) {
return (
<Alert color="danger">
<strong>Error:</strong> Server is not responding
</Alert>
);
}
return (
<div className="text-center m-3">
<div className="m-4">
<h2>Starting up...</h2>
{status?.current! > status?.min! ? (
<div>
<p>
Replaying WAL ({status?.current}/{status?.max})
</p>
<Progress
animated
value={status?.current}
min={status?.min}
max={status?.max}
color={status?.max === status?.current ? 'success' : undefined}
style={{ width: '10%', margin: 'auto' }}
/>
</div>
) : null}
</div>
</div>
);
};
const Starting: FC<RouteComponentProps> = () => {
const pathPrefix = usePathPrefix();
const { ready, walReplayStatus, isUnexpected } = useFetchReadyInterval(pathPrefix);
useEffect(() => {
if (ready) {
navigate('/');
}
}, [ready]);
return <StartingContent isUnexpected={isUnexpected} status={walReplayStatus.data} />;
};
export default Starting;

View file

@ -24,3 +24,13 @@ export interface Rule {
state: RuleState;
type: string;
}
export interface WALReplayData {
min: number;
max: number;
current: number;
}
export interface WALReplayStatus {
data?: WALReplayData;
}

View file

@ -79,6 +79,7 @@ var reactRouterPaths = []string{
"/status",
"/targets",
"/tsdb-status",
"/starting",
}
// withStackTrace logs the stack trace in case the request panics. The function

View file

@ -104,6 +104,10 @@ func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) {
return a.Head().Stats(statsByLabelName), nil
}
func (a *dbAdapter) WALReplayStatus() (tsdb.WALReplayStatus, error) {
return tsdb.WALReplayStatus{}, nil
}
func TestReadyAndHealthy(t *testing.T) {
t.Parallel()
@ -111,7 +115,7 @@ func TestReadyAndHealthy(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, os.RemoveAll(dbDir)) }()
db, err := tsdb.Open(dbDir, nil, nil, nil)
db, err := tsdb.Open(dbDir, nil, nil, nil, nil)
require.NoError(t, err)
opts := &Options{
@ -230,7 +234,7 @@ func TestRoutePrefix(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, os.RemoveAll(dbDir)) }()
db, err := tsdb.Open(dbDir, nil, nil, nil)
db, err := tsdb.Open(dbDir, nil, nil, nil, nil)
require.NoError(t, err)
opts := &Options{
@ -395,7 +399,7 @@ func TestShutdownWithStaleConnection(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, os.RemoveAll(dbDir)) }()
db, err := tsdb.Open(dbDir, nil, nil, nil)
db, err := tsdb.Open(dbDir, nil, nil, nil, nil)
require.NoError(t, err)
timeout := 10 * time.Second