feat(core): Improve health check (#6205)

* remove unnecesary Db re-initialization

this is from before we added `Db.init` in `WorkflowRunnerProcess`

* feat(core): Improved health check

* make health check not care about DB connections

* close DB connections, and shutdown the timer
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-05-10 08:27:04 +00:00 committed by GitHub
parent e3f47994b1
commit 9e7b9fb443
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 184 additions and 153 deletions

View file

@ -159,33 +159,17 @@ export abstract class AbstractServer {
protected setupPushServer() {}
private async setupHealthCheck() {
this.app.use((req, res, next) => {
if (!Db.isInitialized) {
sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
} else next();
// health check should not care about DB connections
this.app.get('/healthz', async (req, res) => {
res.send({ status: 'ok' });
});
// Does very basic health check
this.app.get('/healthz', async (req, res) => {
Logger.debug('Health check started!');
const connection = Db.getConnection();
try {
if (!connection.isInitialized) {
// Connection is not active
throw new ServiceUnavailableError('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
} catch (error) {
ErrorReporter.error(error);
Logger.error('No Database connection!');
return sendErrorResponse(res, new ServiceUnavailableError('No Database connection!'));
}
Logger.debug('Health check completed successfully!');
sendSuccessResponse(res, { status: 'ok' }, true, 200);
const { connectionState } = Db;
this.app.use((req, res, next) => {
if (connectionState.connected) {
if (connectionState.migrated) next();
else res.send('n8n is starting up. Please wait');
} else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
});
if (config.getEnv('executions.mode') === 'queue') {
@ -400,8 +384,8 @@ export abstract class AbstractServer {
);
}
async start(): Promise<void> {
const { app, externalHooks, protocol, sslKey, sslCert } = this;
async init(): Promise<void> {
const { app, protocol, sslKey, sslCert } = this;
if (protocol === 'https' && sslKey && sslCert) {
const https = await import('https');
@ -431,6 +415,12 @@ export abstract class AbstractServer {
await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));
await this.setupHealthCheck();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
}
async start(): Promise<void> {
await this.setupErrorHandlers();
this.setupPushServer();
await this.setupCommonMiddlewares();
@ -438,11 +428,7 @@ export abstract class AbstractServer {
this.setupDevMiddlewares();
}
await this.setupHealthCheck();
await this.configure();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
console.log(`Version: ${N8N_VERSION}`);
const defaultLocale = config.getEnv('defaultLocale');
@ -450,7 +436,7 @@ export abstract class AbstractServer {
console.log(`Locale: ${defaultLocale}`);
}
await externalHooks.run('n8n.ready', [this, config]);
await this.externalHooks.run('n8n.ready', [this, config]);
}
}

View file

@ -453,12 +453,6 @@ export class CredentialsHelper extends ICredentialsHelper {
): Promise<void> {
const credentials = await this.getCredentials(nodeCredentials, type);
if (!Db.isInitialized) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
credentials.setData(data, this.encryptionKey);
const newCredentialsData = credentials.getDataToSave() as ICredentialsDb;

View file

@ -7,6 +7,8 @@ import { Container } from 'typedi';
import type { DataSourceOptions as ConnectionOptions, EntityManager, LoggerOptions } from 'typeorm';
import { DataSource as Connection } from 'typeorm';
import type { TlsOptions } from 'tls';
import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import type { IDatabaseCollections } from '@/Interfaces';
import config from '@/config';
@ -19,6 +21,7 @@ import {
getPostgresConnectionOptions,
getSqliteConnectionOptions,
} from '@db/config';
import { inTest } from '@/constants';
import { wrapMigration } from '@db/utils/migrationHelpers';
import type { DatabaseType, Migration } from '@db/types';
import {
@ -43,13 +46,42 @@ import {
WorkflowTagMappingRepository,
} from '@db/repositories';
export let isInitialized = false;
export const collections = {} as IDatabaseCollections;
let connection: Connection;
export const getConnection = () => connection!;
type ConnectionState = {
connected: boolean;
migrated: boolean;
};
export const connectionState: ConnectionState = {
connected: false,
migrated: false,
};
// Ping DB connection every 2 seconds
let pingTimer: NodeJS.Timer | undefined;
if (!inTest) {
const pingDBFn = async () => {
if (connection?.isInitialized) {
try {
await connection.query('SELECT 1');
connectionState.connected = true;
return;
} catch (error) {
ErrorReporter.error(error);
} finally {
pingTimer = setTimeout(pingDBFn, 2000);
}
}
connectionState.connected = false;
};
pingTimer = setTimeout(pingDBFn, 2000);
}
export async function transaction<T>(fn: (entityManager: EntityManager) => Promise<T>): Promise<T> {
return connection.transaction(fn);
}
@ -94,10 +126,14 @@ export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions {
}
}
export async function init(
testConnectionOptions?: ConnectionOptions,
): Promise<IDatabaseCollections> {
if (isInitialized) return collections;
const openConnection = async (options: ConnectionOptions) => {
connection = new Connection(options);
await connection.initialize();
Container.set(Connection, connection);
};
export async function init(testConnectionOptions?: ConnectionOptions): Promise<void> {
if (connectionState.connected) return;
const dbType = config.getEnv('database.type');
const connectionOptions = testConnectionOptions ?? getConnectionOptions(dbType);
@ -124,9 +160,7 @@ export async function init(
migrationsRun: false,
});
connection = new Connection(connectionOptions);
Container.set(Connection, connection);
await connection.initialize();
await openConnection(connectionOptions);
if (dbType === 'postgresdb') {
const schema = config.getEnv('database.postgresdb.schema');
@ -138,9 +172,13 @@ export async function init(
await connection.query(`SET search_path TO ${searchPath.join(',')};`);
}
(connectionOptions.migrations as Migration[]).forEach(wrapMigration);
connectionState.connected = true;
}
if (!testConnectionOptions && dbType === 'sqlite') {
export async function migrate() {
(connection.options.migrations as Migration[]).forEach(wrapMigration);
if (!inTest && connection.options.type === 'sqlite') {
// This specific migration changes database metadata.
// A field is now nullable. We need to reconnect so that
// n8n knows it has changed. Happens only on sqlite.
@ -161,9 +199,7 @@ export async function init(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
if (migrations.length === 0) {
await connection.destroy();
connection = new Connection(connectionOptions);
Container.set(Connection, connection);
await connection.initialize();
await openConnection(connection.options);
}
} else {
await connection.runMigrations({ transaction: 'each' });
@ -189,7 +225,14 @@ export async function init(
collections.WorkflowStatistics = Container.get(WorkflowStatisticsRepository);
collections.WorkflowTagMapping = Container.get(WorkflowTagMappingRepository);
isInitialized = true;
return collections;
connectionState.migrated = true;
}
export const close = async () => {
if (pingTimer) {
clearTimeout(pingTimer);
pingTimer = undefined;
}
if (connection.isInitialized) await connection.destroy();
};

View file

@ -170,7 +170,7 @@ import { VersionControlController } from '@/environments/versionControl/versionC
const exec = promisify(callbackExec);
class Server extends AbstractServer {
export class Server extends AbstractServer {
endpointPresetCredentials: string;
waitTracker: WaitTracker;
@ -198,23 +198,6 @@ class Server extends AbstractServer {
this.app.set('view engine', 'handlebars');
this.app.set('views', TEMPLATES_DIR);
this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials);
this.credentialTypes = Container.get(CredentialTypes);
this.nodeTypes = Container.get(NodeTypes);
this.activeExecutionsInstance = Container.get(ActiveExecutions);
this.waitTracker = Container.get(WaitTracker);
this.postHog = Container.get(PostHogClient);
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
this.push = Container.get(Push);
if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
}
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const telemetrySettings: ITelemetrySettings = {
enabled: config.getEnv('diagnostics.enabled'),
@ -339,6 +322,88 @@ class Server extends AbstractServer {
};
}
async start() {
this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials);
this.credentialTypes = Container.get(CredentialTypes);
this.nodeTypes = Container.get(NodeTypes);
this.activeExecutionsInstance = Container.get(ActiveExecutions);
this.waitTracker = Container.get(WaitTracker);
this.postHog = Container.get(PostHogClient);
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
this.push = Container.get(Push);
if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
}
await super.start();
const cpus = os.cpus();
const binaryDataConfig = config.getEnv('binaryDataManager');
const diagnosticInfo: IDiagnosticInfo = {
basicAuthActive: config.getEnv('security.basicAuth.active'),
databaseType: config.getEnv('database.type'),
disableProductionWebhooksOnMainProcess: config.getEnv(
'endpoints.disableProductionWebhooksOnMainProcess',
),
notificationsEnabled: config.getEnv('versionNotifications.enabled'),
versionCli: N8N_VERSION,
systemInfo: {
os: {
type: os.type(),
version: os.version(),
},
memory: os.totalmem() / 1024,
cpus: {
count: cpus.length,
model: cpus[0].model,
speed: cpus[0].speed,
},
},
executionVariables: {
executions_process: config.getEnv('executions.process'),
executions_mode: config.getEnv('executions.mode'),
executions_timeout: config.getEnv('executions.timeout'),
executions_timeout_max: config.getEnv('executions.maxTimeout'),
executions_data_save_on_error: config.getEnv('executions.saveDataOnError'),
executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'),
executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'),
executions_data_save_manual_executions: config.getEnv(
'executions.saveDataManualExecutions',
),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
n8n_multi_user_allowed: isUserManagementEnabled(),
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
ldap_allowed: isLdapCurrentAuthenticationMethod(),
saml_enabled: isSamlCurrentAuthenticationMethod(),
};
// Set up event handling
initEvents();
if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') {
const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials');
await reloadNodesAndCredentials(this.loadNodesAndCredentials, this.nodeTypes, this.push);
}
void Db.collections.Workflow.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },
where: {},
}).then(async (workflow) =>
Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt),
);
}
/**
* Returns the current settings for the frontend
*/
@ -1379,67 +1444,3 @@ class Server extends AbstractServer {
setupPushServer(restEndpoint, server, app);
}
}
export async function start(): Promise<void> {
const app = new Server();
await app.start();
const cpus = os.cpus();
const binaryDataConfig = config.getEnv('binaryDataManager');
const diagnosticInfo: IDiagnosticInfo = {
basicAuthActive: config.getEnv('security.basicAuth.active'),
databaseType: config.getEnv('database.type'),
disableProductionWebhooksOnMainProcess: config.getEnv(
'endpoints.disableProductionWebhooksOnMainProcess',
),
notificationsEnabled: config.getEnv('versionNotifications.enabled'),
versionCli: N8N_VERSION,
systemInfo: {
os: {
type: os.type(),
version: os.version(),
},
memory: os.totalmem() / 1024,
cpus: {
count: cpus.length,
model: cpus[0].model,
speed: cpus[0].speed,
},
},
executionVariables: {
executions_process: config.getEnv('executions.process'),
executions_mode: config.getEnv('executions.mode'),
executions_timeout: config.getEnv('executions.timeout'),
executions_timeout_max: config.getEnv('executions.maxTimeout'),
executions_data_save_on_error: config.getEnv('executions.saveDataOnError'),
executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'),
executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'),
executions_data_save_manual_executions: config.getEnv('executions.saveDataManualExecutions'),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
n8n_multi_user_allowed: isUserManagementEnabled(),
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
ldap_allowed: isLdapCurrentAuthenticationMethod(),
saml_enabled: isSamlCurrentAuthenticationMethod(),
};
// Set up event handling
initEvents();
if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') {
const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials');
await reloadNodesAndCredentials(app.loadNodesAndCredentials, app.nodeTypes, app.push);
}
void Db.collections.Workflow.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },
where: {},
}).then(async (workflow) =>
Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt),
);
}

View file

@ -927,12 +927,6 @@ export async function getWorkflowData(
let workflowData: IWorkflowBase | null;
if (workflowInfo.id !== undefined) {
if (!Db.isInitialized) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags'];
workflowData = await WorkflowsService.get({ id: workflowInfo.id }, { relations });

View file

@ -5,6 +5,7 @@ import type { INodeTypes } from 'n8n-workflow';
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import type { IUserSettings } from 'n8n-core';
import { BinaryDataManager, UserSettings } from 'n8n-core';
import type { AbstractServer } from '@/AbstractServer';
import { getLogger } from '@/Logger';
import config from '@/config';
import * as Db from '@/Db';
@ -36,6 +37,8 @@ export abstract class BaseCommand extends Command {
protected instanceId: string;
protected server?: AbstractServer;
async init(): Promise<void> {
await initErrorHandling();
@ -55,6 +58,12 @@ export abstract class BaseCommand extends Command {
this.exitWithCrash('There was an error initializing DB', error),
);
await this.server?.init();
await Db.migrate().catch(async (error: Error) =>
this.exitWithCrash('There was an error running database migrations', error),
);
if (process.env.WEBHOOK_TUNNEL_URL) {
LoggerProxy.warn(
'You are still using the WEBHOOK_TUNNEL_URL environment variable. It has been deprecated and will be removed in a future version of n8n. Please switch to using WEBHOOK_URL instead.',
@ -112,9 +121,9 @@ export abstract class BaseCommand extends Command {
async finally(error: Error | undefined) {
if (inTest || this.id === 'start') return;
if (Db.isInitialized) {
if (Db.connectionState.connected) {
await sleep(100); // give any in-flight query some time to finish
await Db.getConnection().destroy();
await Db.close();
}
const exitCode = error instanceof ExitError ? error.oclif.exit : error ? 1 : 0;
this.exit(exitCode);

View file

@ -21,7 +21,7 @@ import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import * as GenericHelpers from '@/GenericHelpers';
import * as Server from '@/Server';
import { Server } from '@/Server';
import { TestWebhooks } from '@/TestWebhooks';
import { getAllInstalledPackages } from '@/CommunityNodes/packageModel';
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
@ -62,6 +62,8 @@ export class Start extends BaseCommand {
protected activeWorkflowRunner: ActiveWorkflowRunner;
protected server = new Server();
/**
* Opens the UI in browser
*/
@ -208,6 +210,7 @@ export class Start extends BaseCommand {
async init() {
await this.initCrashJournal();
await super.init();
this.logger.info('Initializing n8n process');
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
@ -351,7 +354,7 @@ export class Start extends BaseCommand {
);
}
await Server.start();
await this.server.start();
// Start to get active workflows and run their triggers
await this.activeWorkflowRunner.init();

View file

@ -16,6 +16,8 @@ export class Webhook extends BaseCommand {
help: flags.help({ char: 'h' }),
};
protected server = new WebhookServer();
/**
* Stops n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
@ -81,7 +83,7 @@ export class Webhook extends BaseCommand {
async run() {
await Container.get(Queue).init();
await new WebhookServer().start();
await this.server.start();
this.logger.info('Webhook listener waiting for requests.');
// Make sure that the process does not close

View file

@ -58,7 +58,7 @@ export async function init() {
if (dbType === 'sqlite') {
// no bootstrap connection required
return Db.init(getSqliteOptions({ name: testDbName }));
await Db.init(getSqliteOptions({ name: testDbName }));
}
if (dbType === 'postgresdb') {
@ -89,7 +89,7 @@ export async function init() {
await bootstrapPostgres.query(`CREATE DATABASE ${testDbName}`);
await bootstrapPostgres.destroy();
return Db.init(getDBOptions('postgres', testDbName));
await Db.init(getDBOptions('postgres', testDbName));
}
if (dbType === 'mysqldb') {
@ -97,18 +97,17 @@ export async function init() {
await bootstrapMysql.query(`CREATE DATABASE ${testDbName}`);
await bootstrapMysql.destroy();
return Db.init(getDBOptions('mysql', testDbName));
await Db.init(getDBOptions('mysql', testDbName));
}
throw new Error(`Unrecognized DB type: ${dbType}`);
await Db.migrate();
}
/**
* Drop test DB, closing bootstrap connection if existing.
*/
export async function terminate() {
const connection = Db.getConnection();
if (connection.isInitialized) await connection.destroy();
await Db.close();
}
/**