diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 55eae9376a..10bebc44b8 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -159,33 +159,17 @@ export abstract class AbstractServer { protected setupPushServer() {} private async setupHealthCheck() { - this.app.use((req, res, next) => { - if (!Db.isInitialized) { - sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); - } else next(); + // health check should not care about DB connections + this.app.get('/healthz', async (req, res) => { + res.send({ status: 'ok' }); }); - // Does very basic health check - this.app.get('/healthz', async (req, res) => { - Logger.debug('Health check started!'); - - const connection = Db.getConnection(); - - try { - if (!connection.isInitialized) { - // Connection is not active - throw new ServiceUnavailableError('No active database connection!'); - } - // DB ping - await connection.query('SELECT 1'); - } catch (error) { - ErrorReporter.error(error); - Logger.error('No Database connection!'); - return sendErrorResponse(res, new ServiceUnavailableError('No Database connection!')); - } - - Logger.debug('Health check completed successfully!'); - sendSuccessResponse(res, { status: 'ok' }, true, 200); + const { connectionState } = Db; + this.app.use((req, res, next) => { + if (connectionState.connected) { + if (connectionState.migrated) next(); + else res.send('n8n is starting up. Please wait'); + } else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); }); if (config.getEnv('executions.mode') === 'queue') { @@ -400,8 +384,8 @@ export abstract class AbstractServer { ); } - async start(): Promise { - const { app, externalHooks, protocol, sslKey, sslCert } = this; + async init(): Promise { + const { app, protocol, sslKey, sslCert } = this; if (protocol === 'https' && sslKey && sslCert) { const https = await import('https'); @@ -431,6 +415,12 @@ export abstract class AbstractServer { await new Promise((resolve) => this.server.listen(PORT, ADDRESS, () => resolve())); + await this.setupHealthCheck(); + + console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); + } + + async start(): Promise { await this.setupErrorHandlers(); this.setupPushServer(); await this.setupCommonMiddlewares(); @@ -438,11 +428,7 @@ export abstract class AbstractServer { this.setupDevMiddlewares(); } - await this.setupHealthCheck(); - await this.configure(); - - console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); console.log(`Version: ${N8N_VERSION}`); const defaultLocale = config.getEnv('defaultLocale'); @@ -450,7 +436,7 @@ export abstract class AbstractServer { console.log(`Locale: ${defaultLocale}`); } - await externalHooks.run('n8n.ready', [this, config]); + await this.externalHooks.run('n8n.ready', [this, config]); } } diff --git a/packages/cli/src/CredentialsHelper.ts b/packages/cli/src/CredentialsHelper.ts index cfc86bfd26..5b20ff91b2 100644 --- a/packages/cli/src/CredentialsHelper.ts +++ b/packages/cli/src/CredentialsHelper.ts @@ -453,12 +453,6 @@ export class CredentialsHelper extends ICredentialsHelper { ): Promise { const credentials = await this.getCredentials(nodeCredentials, type); - if (!Db.isInitialized) { - // The first time executeWorkflow gets called the Database has - // to get initialized first - await Db.init(); - } - credentials.setData(data, this.encryptionKey); const newCredentialsData = credentials.getDataToSave() as ICredentialsDb; diff --git a/packages/cli/src/Db.ts b/packages/cli/src/Db.ts index 114aa22a31..af274427e2 100644 --- a/packages/cli/src/Db.ts +++ b/packages/cli/src/Db.ts @@ -7,6 +7,8 @@ import { Container } from 'typedi'; import type { DataSourceOptions as ConnectionOptions, EntityManager, LoggerOptions } from 'typeorm'; import { DataSource as Connection } from 'typeorm'; import type { TlsOptions } from 'tls'; +import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; + import type { IDatabaseCollections } from '@/Interfaces'; import config from '@/config'; @@ -19,6 +21,7 @@ import { getPostgresConnectionOptions, getSqliteConnectionOptions, } from '@db/config'; +import { inTest } from '@/constants'; import { wrapMigration } from '@db/utils/migrationHelpers'; import type { DatabaseType, Migration } from '@db/types'; import { @@ -43,13 +46,42 @@ import { WorkflowTagMappingRepository, } from '@db/repositories'; -export let isInitialized = false; export const collections = {} as IDatabaseCollections; let connection: Connection; export const getConnection = () => connection!; +type ConnectionState = { + connected: boolean; + migrated: boolean; +}; + +export const connectionState: ConnectionState = { + connected: false, + migrated: false, +}; + +// Ping DB connection every 2 seconds +let pingTimer: NodeJS.Timer | undefined; +if (!inTest) { + const pingDBFn = async () => { + if (connection?.isInitialized) { + try { + await connection.query('SELECT 1'); + connectionState.connected = true; + return; + } catch (error) { + ErrorReporter.error(error); + } finally { + pingTimer = setTimeout(pingDBFn, 2000); + } + } + connectionState.connected = false; + }; + pingTimer = setTimeout(pingDBFn, 2000); +} + export async function transaction(fn: (entityManager: EntityManager) => Promise): Promise { return connection.transaction(fn); } @@ -94,10 +126,14 @@ export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions { } } -export async function init( - testConnectionOptions?: ConnectionOptions, -): Promise { - if (isInitialized) return collections; +const openConnection = async (options: ConnectionOptions) => { + connection = new Connection(options); + await connection.initialize(); + Container.set(Connection, connection); +}; + +export async function init(testConnectionOptions?: ConnectionOptions): Promise { + if (connectionState.connected) return; const dbType = config.getEnv('database.type'); const connectionOptions = testConnectionOptions ?? getConnectionOptions(dbType); @@ -124,9 +160,7 @@ export async function init( migrationsRun: false, }); - connection = new Connection(connectionOptions); - Container.set(Connection, connection); - await connection.initialize(); + await openConnection(connectionOptions); if (dbType === 'postgresdb') { const schema = config.getEnv('database.postgresdb.schema'); @@ -138,9 +172,13 @@ export async function init( await connection.query(`SET search_path TO ${searchPath.join(',')};`); } - (connectionOptions.migrations as Migration[]).forEach(wrapMigration); + connectionState.connected = true; +} - if (!testConnectionOptions && dbType === 'sqlite') { +export async function migrate() { + (connection.options.migrations as Migration[]).forEach(wrapMigration); + + if (!inTest && connection.options.type === 'sqlite') { // This specific migration changes database metadata. // A field is now nullable. We need to reconnect so that // n8n knows it has changed. Happens only on sqlite. @@ -161,9 +199,7 @@ export async function init( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access if (migrations.length === 0) { await connection.destroy(); - connection = new Connection(connectionOptions); - Container.set(Connection, connection); - await connection.initialize(); + await openConnection(connection.options); } } else { await connection.runMigrations({ transaction: 'each' }); @@ -189,7 +225,14 @@ export async function init( collections.WorkflowStatistics = Container.get(WorkflowStatisticsRepository); collections.WorkflowTagMapping = Container.get(WorkflowTagMappingRepository); - isInitialized = true; - - return collections; + connectionState.migrated = true; } + +export const close = async () => { + if (pingTimer) { + clearTimeout(pingTimer); + pingTimer = undefined; + } + + if (connection.isInitialized) await connection.destroy(); +}; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index ddaf2a6f90..f5406d2b2d 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -170,7 +170,7 @@ import { VersionControlController } from '@/environments/versionControl/versionC const exec = promisify(callbackExec); -class Server extends AbstractServer { +export class Server extends AbstractServer { endpointPresetCredentials: string; waitTracker: WaitTracker; @@ -198,23 +198,6 @@ class Server extends AbstractServer { this.app.set('view engine', 'handlebars'); this.app.set('views', TEMPLATES_DIR); - this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials); - this.credentialTypes = Container.get(CredentialTypes); - this.nodeTypes = Container.get(NodeTypes); - - this.activeExecutionsInstance = Container.get(ActiveExecutions); - this.waitTracker = Container.get(WaitTracker); - this.postHog = Container.get(PostHogClient); - - this.presetCredentialsLoaded = false; - this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); - - this.push = Container.get(Push); - - if (process.env.E2E_TESTS === 'true') { - this.app.use('/e2e', require('./api/e2e.api').e2eController); - } - const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const telemetrySettings: ITelemetrySettings = { enabled: config.getEnv('diagnostics.enabled'), @@ -339,6 +322,88 @@ class Server extends AbstractServer { }; } + async start() { + this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials); + this.credentialTypes = Container.get(CredentialTypes); + this.nodeTypes = Container.get(NodeTypes); + + this.activeExecutionsInstance = Container.get(ActiveExecutions); + this.waitTracker = Container.get(WaitTracker); + this.postHog = Container.get(PostHogClient); + + this.presetCredentialsLoaded = false; + this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); + + this.push = Container.get(Push); + + if (process.env.E2E_TESTS === 'true') { + this.app.use('/e2e', require('./api/e2e.api').e2eController); + } + + await super.start(); + + const cpus = os.cpus(); + const binaryDataConfig = config.getEnv('binaryDataManager'); + const diagnosticInfo: IDiagnosticInfo = { + basicAuthActive: config.getEnv('security.basicAuth.active'), + databaseType: config.getEnv('database.type'), + disableProductionWebhooksOnMainProcess: config.getEnv( + 'endpoints.disableProductionWebhooksOnMainProcess', + ), + notificationsEnabled: config.getEnv('versionNotifications.enabled'), + versionCli: N8N_VERSION, + systemInfo: { + os: { + type: os.type(), + version: os.version(), + }, + memory: os.totalmem() / 1024, + cpus: { + count: cpus.length, + model: cpus[0].model, + speed: cpus[0].speed, + }, + }, + executionVariables: { + executions_process: config.getEnv('executions.process'), + executions_mode: config.getEnv('executions.mode'), + executions_timeout: config.getEnv('executions.timeout'), + executions_timeout_max: config.getEnv('executions.maxTimeout'), + executions_data_save_on_error: config.getEnv('executions.saveDataOnError'), + executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'), + executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'), + executions_data_save_manual_executions: config.getEnv( + 'executions.saveDataManualExecutions', + ), + executions_data_prune: config.getEnv('executions.pruneData'), + executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'), + executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'), + }, + deploymentType: config.getEnv('deployment.type'), + binaryDataMode: binaryDataConfig.mode, + n8n_multi_user_allowed: isUserManagementEnabled(), + smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp', + ldap_allowed: isLdapCurrentAuthenticationMethod(), + saml_enabled: isSamlCurrentAuthenticationMethod(), + }; + + // Set up event handling + initEvents(); + + if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { + const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials'); + await reloadNodesAndCredentials(this.loadNodesAndCredentials, this.nodeTypes, this.push); + } + + void Db.collections.Workflow.findOne({ + select: ['createdAt'], + order: { createdAt: 'ASC' }, + where: {}, + }).then(async (workflow) => + Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt), + ); + } + /** * Returns the current settings for the frontend */ @@ -1379,67 +1444,3 @@ class Server extends AbstractServer { setupPushServer(restEndpoint, server, app); } } - -export async function start(): Promise { - const app = new Server(); - await app.start(); - - const cpus = os.cpus(); - const binaryDataConfig = config.getEnv('binaryDataManager'); - const diagnosticInfo: IDiagnosticInfo = { - basicAuthActive: config.getEnv('security.basicAuth.active'), - databaseType: config.getEnv('database.type'), - disableProductionWebhooksOnMainProcess: config.getEnv( - 'endpoints.disableProductionWebhooksOnMainProcess', - ), - notificationsEnabled: config.getEnv('versionNotifications.enabled'), - versionCli: N8N_VERSION, - systemInfo: { - os: { - type: os.type(), - version: os.version(), - }, - memory: os.totalmem() / 1024, - cpus: { - count: cpus.length, - model: cpus[0].model, - speed: cpus[0].speed, - }, - }, - executionVariables: { - executions_process: config.getEnv('executions.process'), - executions_mode: config.getEnv('executions.mode'), - executions_timeout: config.getEnv('executions.timeout'), - executions_timeout_max: config.getEnv('executions.maxTimeout'), - executions_data_save_on_error: config.getEnv('executions.saveDataOnError'), - executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'), - executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'), - executions_data_save_manual_executions: config.getEnv('executions.saveDataManualExecutions'), - executions_data_prune: config.getEnv('executions.pruneData'), - executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'), - executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'), - }, - deploymentType: config.getEnv('deployment.type'), - binaryDataMode: binaryDataConfig.mode, - n8n_multi_user_allowed: isUserManagementEnabled(), - smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp', - ldap_allowed: isLdapCurrentAuthenticationMethod(), - saml_enabled: isSamlCurrentAuthenticationMethod(), - }; - - // Set up event handling - initEvents(); - - if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { - const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials'); - await reloadNodesAndCredentials(app.loadNodesAndCredentials, app.nodeTypes, app.push); - } - - void Db.collections.Workflow.findOne({ - select: ['createdAt'], - order: { createdAt: 'ASC' }, - where: {}, - }).then(async (workflow) => - Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt), - ); -} diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 9cc7aac265..f19278700b 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -927,12 +927,6 @@ export async function getWorkflowData( let workflowData: IWorkflowBase | null; if (workflowInfo.id !== undefined) { - if (!Db.isInitialized) { - // The first time executeWorkflow gets called the Database has - // to get initialized first - await Db.init(); - } - const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags']; workflowData = await WorkflowsService.get({ id: workflowInfo.id }, { relations }); diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 2bd7a88572..e629458b91 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -5,6 +5,7 @@ import type { INodeTypes } from 'n8n-workflow'; import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow'; import type { IUserSettings } from 'n8n-core'; import { BinaryDataManager, UserSettings } from 'n8n-core'; +import type { AbstractServer } from '@/AbstractServer'; import { getLogger } from '@/Logger'; import config from '@/config'; import * as Db from '@/Db'; @@ -36,6 +37,8 @@ export abstract class BaseCommand extends Command { protected instanceId: string; + protected server?: AbstractServer; + async init(): Promise { await initErrorHandling(); @@ -55,6 +58,12 @@ export abstract class BaseCommand extends Command { this.exitWithCrash('There was an error initializing DB', error), ); + await this.server?.init(); + + await Db.migrate().catch(async (error: Error) => + this.exitWithCrash('There was an error running database migrations', error), + ); + if (process.env.WEBHOOK_TUNNEL_URL) { LoggerProxy.warn( 'You are still using the WEBHOOK_TUNNEL_URL environment variable. It has been deprecated and will be removed in a future version of n8n. Please switch to using WEBHOOK_URL instead.', @@ -112,9 +121,9 @@ export abstract class BaseCommand extends Command { async finally(error: Error | undefined) { if (inTest || this.id === 'start') return; - if (Db.isInitialized) { + if (Db.connectionState.connected) { await sleep(100); // give any in-flight query some time to finish - await Db.getConnection().destroy(); + await Db.close(); } const exitCode = error instanceof ExitError ? error.oclif.exit : error ? 1 : 0; this.exit(exitCode); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 358d8091eb..016fc48a9e 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -21,7 +21,7 @@ import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import * as Db from '@/Db'; import * as GenericHelpers from '@/GenericHelpers'; -import * as Server from '@/Server'; +import { Server } from '@/Server'; import { TestWebhooks } from '@/TestWebhooks'; import { getAllInstalledPackages } from '@/CommunityNodes/packageModel'; import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants'; @@ -62,6 +62,8 @@ export class Start extends BaseCommand { protected activeWorkflowRunner: ActiveWorkflowRunner; + protected server = new Server(); + /** * Opens the UI in browser */ @@ -208,6 +210,7 @@ export class Start extends BaseCommand { async init() { await this.initCrashJournal(); + await super.init(); this.logger.info('Initializing n8n process'); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); @@ -351,7 +354,7 @@ export class Start extends BaseCommand { ); } - await Server.start(); + await this.server.start(); // Start to get active workflows and run their triggers await this.activeWorkflowRunner.init(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index cc1ac53b35..da2fc44315 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -16,6 +16,8 @@ export class Webhook extends BaseCommand { help: flags.help({ char: 'h' }), }; + protected server = new WebhookServer(); + /** * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -81,7 +83,7 @@ export class Webhook extends BaseCommand { async run() { await Container.get(Queue).init(); - await new WebhookServer().start(); + await this.server.start(); this.logger.info('Webhook listener waiting for requests.'); // Make sure that the process does not close diff --git a/packages/cli/test/integration/shared/testDb.ts b/packages/cli/test/integration/shared/testDb.ts index 5cc910367d..08eb9fce2e 100644 --- a/packages/cli/test/integration/shared/testDb.ts +++ b/packages/cli/test/integration/shared/testDb.ts @@ -58,7 +58,7 @@ export async function init() { if (dbType === 'sqlite') { // no bootstrap connection required - return Db.init(getSqliteOptions({ name: testDbName })); + await Db.init(getSqliteOptions({ name: testDbName })); } if (dbType === 'postgresdb') { @@ -89,7 +89,7 @@ export async function init() { await bootstrapPostgres.query(`CREATE DATABASE ${testDbName}`); await bootstrapPostgres.destroy(); - return Db.init(getDBOptions('postgres', testDbName)); + await Db.init(getDBOptions('postgres', testDbName)); } if (dbType === 'mysqldb') { @@ -97,18 +97,17 @@ export async function init() { await bootstrapMysql.query(`CREATE DATABASE ${testDbName}`); await bootstrapMysql.destroy(); - return Db.init(getDBOptions('mysql', testDbName)); + await Db.init(getDBOptions('mysql', testDbName)); } - throw new Error(`Unrecognized DB type: ${dbType}`); + await Db.migrate(); } /** * Drop test DB, closing bootstrap connection if existing. */ export async function terminate() { - const connection = Db.getConnection(); - if (connection.isInitialized) await connection.destroy(); + await Db.close(); } /**