refactor(core): Delete duplicate code across all commands (#5452)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-02-10 14:59:20 +01:00 committed by GitHub
parent 8494c97821
commit 5194513850
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 979 additions and 1345 deletions

View file

@ -64,6 +64,7 @@ import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks'; import { ExternalHooks } from '@/ExternalHooks';
import { whereClause } from './UserManagement/UserManagementHelper'; import { whereClause } from './UserManagement/UserManagementHelper';
import { WorkflowsService } from './workflows/workflows.services'; import { WorkflowsService } from './workflows/workflows.services';
import { START_NODES } from './constants';
const WEBHOOK_PROD_UNREGISTERED_HINT = const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@ -801,10 +802,7 @@ export class ActiveWorkflowRunner {
settings: workflowData.settings, settings: workflowData.settings,
}); });
const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated([ const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(START_NODES);
'n8n-nodes-base.start',
'n8n-nodes-base.manualTrigger',
]);
if (!canBeActivated) { if (!canBeActivated) {
Logger.error(`Unable to activate workflow "${workflowData.name}"`); Logger.error(`Unable to activate workflow "${workflowData.name}"`);
throw new Error( throw new Error(

View file

@ -30,6 +30,7 @@ import {
RESPONSE_ERROR_MESSAGES, RESPONSE_ERROR_MESSAGES,
CUSTOM_API_CALL_KEY, CUSTOM_API_CALL_KEY,
CUSTOM_API_CALL_NAME, CUSTOM_API_CALL_NAME,
inTest,
} from '@/constants'; } from '@/constants';
import { import {
persistInstalledPackageData, persistInstalledPackageData,
@ -61,7 +62,7 @@ export class LoadNodesAndCredentialsClass implements INodesAndCredentials {
// @ts-ignore // @ts-ignore
// eslint-disable-next-line @typescript-eslint/no-unsafe-call // eslint-disable-next-line @typescript-eslint/no-unsafe-call
module.constructor._initPaths(); if (!inTest) module.constructor._initPaths();
await this.loadNodesFromBasePackages(); await this.loadNodesFromBasePackages();
await this.loadNodesFromDownloadedPackages(); await this.loadNodesFromDownloadedPackages();

View file

@ -11,6 +11,7 @@ import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { isInstanceOwner } from '../users/users.service'; import { isInstanceOwner } from '../users/users.service';
import type { Role } from '@db/entities/Role'; import type { Role } from '@db/entities/Role';
import config from '@/config'; import config from '@/config';
import { START_NODES } from '@/constants';
function insertIf(condition: boolean, elements: string[]): string[] { function insertIf(condition: boolean, elements: string[]): string[] {
return condition ? elements : []; return condition ? elements : [];
@ -128,7 +129,7 @@ export async function updateWorkflow(
export function hasStartNode(workflow: WorkflowEntity): boolean { export function hasStartNode(workflow: WorkflowEntity): boolean {
if (!workflow.nodes.length) return false; if (!workflow.nodes.length) return false;
const found = workflow.nodes.find((node) => node.type === 'n8n-nodes-base.start'); const found = workflow.nodes.find((node) => START_NODES.includes(node.type));
return Boolean(found); return Boolean(found);
} }

View file

@ -5,6 +5,7 @@ import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions'; import * as ActiveExecutions from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers'; import * as WebhookHelpers from '@/WebhookHelpers';
export type JobId = Bull.JobId;
export type Job = Bull.Job<JobData>; export type Job = Bull.Job<JobData>;
export type JobQueue = Bull.Queue<JobData>; export type JobQueue = Bull.Queue<JobData>;
@ -55,7 +56,7 @@ export class Queue {
return this.jobQueue.add(jobData, jobOptions); return this.jobQueue.add(jobData, jobOptions);
} }
async getJob(jobId: Bull.JobId): Promise<Job | null> { async getJob(jobId: JobId): Promise<Job | null> {
return this.jobQueue.getJob(jobId); return this.jobQueue.getJob(jobId);
} }

View file

@ -101,20 +101,20 @@ class WorkflowRunnerProcess {
this.startedAt = new Date(); this.startedAt = new Date();
const userSettings = await UserSettings.prepareUserSettings();
const loadNodesAndCredentials = LoadNodesAndCredentials(); const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init(); await loadNodesAndCredentials.init();
const nodeTypes = NodeTypes(loadNodesAndCredentials); const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials); const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes); CredentialsOverwrites(credentialTypes);
// Load all external hooks // Load all external hooks
const externalHooks = ExternalHooks(); const externalHooks = ExternalHooks();
await externalHooks.init(); await externalHooks.init();
const instanceId = (await UserSettings.prepareUserSettings()).instanceId ?? ''; const instanceId = userSettings.instanceId ?? '';
await InternalHooksManager.init(instanceId, nodeTypes); await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager'); const binaryDataConfig = config.getEnv('binaryDataManager');

View file

@ -1,59 +1,94 @@
import { Command } from '@oclif/core'; import { Command } from '@oclif/command';
import { LoggerProxy } from 'n8n-workflow'; import type { INodeTypes } from 'n8n-workflow';
import type { Logger } from '@/Logger'; import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import type { IUserSettings } from 'n8n-core';
import { BinaryDataManager, UserSettings } from 'n8n-core';
import { getLogger } from '@/Logger'; import { getLogger } from '@/Logger';
import { User } from '@db/entities/User'; import config from '@/config';
import * as Db from '@/Db'; import * as Db from '@/Db';
import * as CrashJournal from '@/CrashJournal';
import { inTest } from '@/constants'; import { inTest } from '@/constants';
import { CredentialTypes } from '@/CredentialTypes';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { InternalHooksManager } from '@/InternalHooksManager';
import { initErrorHandling } from '@/ErrorReporting';
import { ExternalHooks } from '@/ExternalHooks';
import { NodeTypes } from '@/NodeTypes';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { IExternalHooksClass } from '@/Interfaces';
export const UM_FIX_INSTRUCTION =
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';
export abstract class BaseCommand extends Command { export abstract class BaseCommand extends Command {
logger: Logger; protected logger = LoggerProxy.init(getLogger());
/** protected externalHooks: IExternalHooksClass;
* Lifecycle methods
*/ protected loadNodesAndCredentials: LoadNodesAndCredentialsClass;
protected nodeTypes: INodeTypes;
protected userSettings: IUserSettings;
async init(): Promise<void> { async init(): Promise<void> {
this.logger = getLogger(); await initErrorHandling();
LoggerProxy.init(this.logger);
await Db.init(); // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.stopProcess = this.stopProcess.bind(this);
// Make sure the settings exist
this.userSettings = await UserSettings.prepareUserSettings();
this.loadNodesAndCredentials = LoadNodesAndCredentials();
await this.loadNodesAndCredentials.init();
this.nodeTypes = NodeTypes(this.loadNodesAndCredentials);
const credentialTypes = CredentialTypes(this.loadNodesAndCredentials);
CredentialsOverwrites(credentialTypes);
await InternalHooksManager.init(this.userSettings.instanceId ?? '', this.nodeTypes);
await Db.init().catch(async (error: Error) =>
this.exitWithCrash('There was an error initializing DB', error),
);
} }
async finally(): Promise<void> { protected async stopProcess() {
if (inTest) return; // This needs to be overridden
this.exit();
} }
/** protected async initCrashJournal() {
* User Management utils await CrashJournal.init();
*/ }
defaultUserProps = { protected async exitSuccessFully() {
firstName: null, try {
lastName: null, await CrashJournal.cleanup();
email: null, } finally {
password: null, process.exit();
resetPasswordToken: null, }
}; }
async getInstanceOwner(): Promise<User> { protected async exitWithCrash(message: string, error: unknown) {
const globalRole = await Db.collections.Role.findOneByOrFail({ ErrorReporter.error(new Error(message, { cause: error }), { level: 'fatal' });
name: 'owner', await sleep(2000);
scope: 'global', process.exit(1);
}); }
const owner = await Db.collections.User.findOneBy({ globalRoleId: globalRole.id }); protected async initBinaryManager() {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
}
if (owner) return owner; protected async initExternalHooks() {
this.externalHooks = ExternalHooks();
await this.externalHooks.init();
}
const user = new User(); async finally(error: Error | undefined) {
if (inTest || this.id === 'start') return;
Object.assign(user, { ...this.defaultUserProps, globalRole }); if (Db.isInitialized) await Db.connection.destroy();
this.exit(error ? 1 : 0);
await Db.collections.User.save(user);
return Db.collections.User.findOneByOrFail({ globalRoleId: globalRole.id });
} }
} }

View file

@ -12,6 +12,7 @@ interface IResult {
}; };
executions: IExecutionResult[]; executions: IExecutionResult[];
} }
interface IExecutionResult { interface IExecutionResult {
workflowId: string; workflowId: string;
workflowName: string; workflowName: string;
@ -53,14 +54,3 @@ declare module 'json-diff' {
} }
export function diff(obj1: unknown, obj2: unknown, diffOptions: IDiffOptions): string; export function diff(obj1: unknown, obj2: unknown, diffOptions: IDiffOptions): string;
} }
type SmtpConfig = {
host: string;
port: number;
secure: boolean;
auth: {
user: string;
pass: string;
};
sender: string;
};

View file

@ -1,19 +1,12 @@
import Command, { flags } from '@oclif/command'; import { flags } from '@oclif/command';
import { LoggerProxy } from 'n8n-workflow';
import { UserSettings } from 'n8n-core';
import type { Logger } from '@/Logger';
import { getLogger } from '@/Logger';
import { audit } from '@/audit'; import { audit } from '@/audit';
import { RISK_CATEGORIES } from '@/audit/constants'; import { RISK_CATEGORIES } from '@/audit/constants';
import { CredentialTypes } from '@/CredentialTypes';
import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { InternalHooksManager } from '@/InternalHooksManager'; import { InternalHooksManager } from '@/InternalHooksManager';
import config from '@/config'; import config from '@/config';
import * as Db from '@/Db';
import type { Risk } from '@/audit/types'; import type { Risk } from '@/audit/types';
import { BaseCommand } from './BaseCommand';
export class SecurityAudit extends Command { export class SecurityAudit extends BaseCommand {
static description = 'Generate a security audit report for this n8n instance'; static description = 'Generate a security audit report for this n8n instance';
static examples = [ static examples = [
@ -35,11 +28,7 @@ export class SecurityAudit extends Command {
}), }),
}; };
logger: Logger;
async run() { async run() {
await this.init();
const { flags: auditFlags } = this.parse(SecurityAudit); const { flags: auditFlags } = this.parse(SecurityAudit);
const categories = const categories =
@ -70,38 +59,8 @@ export class SecurityAudit extends Command {
void InternalHooksManager.getInstance().onAuditGeneratedViaCli(); void InternalHooksManager.getInstance().onAuditGeneratedViaCli();
} }
async init() {
await Db.init();
this.initLogger();
await this.initInternalHooksManager();
}
initLogger() {
this.logger = getLogger();
LoggerProxy.init(this.logger);
}
async initInternalHooksManager(): Promise<void> {
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
const nodeTypes = NodeTypes(loadNodesAndCredentials);
CredentialTypes(loadNodesAndCredentials);
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
}
async catch(error: Error) { async catch(error: Error) {
this.logger.error('Failed to generate security audit'); this.logger.error('Failed to generate security audit');
this.logger.error(error.message); this.logger.error(error.message);
this.exit(1);
}
async finally() {
this.exit();
} }
} }

View file

@ -1,10 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-console */
import { Command, flags } from '@oclif/command'; import { Command, flags } from '@oclif/command';
import type { DataSourceOptions as ConnectionOptions } from 'typeorm'; import type { DataSourceOptions as ConnectionOptions } from 'typeorm';
import { DataSource as Connection } from 'typeorm'; import { DataSource as Connection } from 'typeorm';
import { LoggerProxy } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger'; import { getLogger } from '@/Logger';
import { getConnectionOptions } from '@/Db'; import { getConnectionOptions } from '@/Db';
import config from '@/config'; import config from '@/config';
@ -18,14 +15,15 @@ export class DbRevertMigrationCommand extends Command {
help: flags.help({ char: 'h' }), help: flags.help({ char: 'h' }),
}; };
async run() { protected logger = LoggerProxy.init(getLogger());
const logger = getLogger();
LoggerProxy.init(logger);
private connection: Connection;
async init() {
this.parse(DbRevertMigrationCommand); this.parse(DbRevertMigrationCommand);
}
let connection: Connection | undefined; async run() {
try {
const dbType = config.getEnv('database.type'); const dbType = config.getEnv('database.type');
const connectionOptions: ConnectionOptions = { const connectionOptions: ConnectionOptions = {
...getConnectionOptions(dbType), ...getConnectionOptions(dbType),
@ -35,19 +33,21 @@ export class DbRevertMigrationCommand extends Command {
dropSchema: false, dropSchema: false,
logging: ['query', 'error', 'schema'], logging: ['query', 'error', 'schema'],
}; };
connection = new Connection(connectionOptions);
await connection.initialize();
await connection.undoLastMigration();
await connection.destroy();
} catch (error) {
if (connection?.isInitialized) await connection.destroy();
console.error('Error reverting last migration. See log messages for details.'); this.connection = new Connection(connectionOptions);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument await this.connection.initialize();
logger.error(error.message); await this.connection.undoLastMigration();
this.exit(1); await this.connection.destroy();
} }
this.exit(); async catch(error: Error) {
this.logger.error('Error reverting last migration. See log messages for details.');
this.logger.error(error.message);
}
protected async finally(error: Error | undefined) {
if (this.connection?.isInitialized) await this.connection.destroy();
this.exit(error ? 1 : 0);
} }
} }

View file

@ -1,30 +1,20 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-console */
import { promises as fs } from 'fs'; import { promises as fs } from 'fs';
import { Command, flags } from '@oclif/command'; import { flags } from '@oclif/command';
import { BinaryDataManager, UserSettings, PLACEHOLDER_EMPTY_WORKFLOW_ID } from 'n8n-core'; import { PLACEHOLDER_EMPTY_WORKFLOW_ID } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow'; import type { IWorkflowBase } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow'; import { ExecutionBaseError } from 'n8n-workflow';
import * as ActiveExecutions from '@/ActiveExecutions'; import * as ActiveExecutions from '@/ActiveExecutions';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner'; import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { getLogger } from '@/Logger';
import config from '@/config';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper'; import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils'; import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events'; import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand';
export class Execute extends Command { export class Execute extends BaseCommand {
static description = '\nExecutes a given workflow'; static description = '\nExecutes a given workflow';
static examples = ['$ n8n execute --id=5', '$ n8n execute --file=workflow.json']; static examples = ['$ n8n execute --id=5', '$ n8n execute --file=workflow.json'];
@ -42,33 +32,26 @@ export class Execute extends Command {
}), }),
}; };
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async init() {
async run() { await super.init();
const logger = getLogger(); await this.initBinaryManager();
LoggerProxy.init(logger); await this.initExternalHooks();
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
// Add event handlers // Add event handlers
initEvents(); initEvents();
}
async run() {
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Execute); const { flags } = this.parse(Execute);
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init();
if (!flags.id && !flags.file) { if (!flags.id && !flags.file) {
console.info('Either option "--id" or "--file" have to be set!'); this.logger.info('Either option "--id" or "--file" have to be set!');
return; return;
} }
if (flags.id && flags.file) { if (flags.id && flags.file) {
console.info('Either "id" or "file" can be set never both!'); this.logger.info('Either "id" or "file" can be set never both!');
return; return;
} }
@ -82,7 +65,7 @@ export class Execute extends Command {
} catch (error) { } catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
if (error.code === 'ENOENT') { if (error.code === 'ENOENT') {
console.info(`The file "${flags.file}" could not be found.`); this.logger.info(`The file "${flags.file}" could not be found.`);
return; return;
} }
@ -96,22 +79,19 @@ export class Execute extends Command {
workflowData.nodes === undefined || workflowData.nodes === undefined ||
workflowData.connections === undefined workflowData.connections === undefined
) { ) {
console.info(`The file "${flags.file}" does not contain valid workflow data.`); this.logger.info(`The file "${flags.file}" does not contain valid workflow data.`);
return; return;
} }
workflowId = workflowData.id ?? PLACEHOLDER_EMPTY_WORKFLOW_ID; workflowId = workflowData.id ?? PLACEHOLDER_EMPTY_WORKFLOW_ID;
} }
// Wait till the database is ready
await startDbInitPromise;
if (flags.id) { if (flags.id) {
// Id of workflow is given // Id of workflow is given
workflowId = flags.id; workflowId = flags.id;
workflowData = await Db.collections.Workflow.findOneBy({ id: workflowId }); workflowData = await Db.collections.Workflow.findOneBy({ id: workflowId });
if (workflowData === null) { if (workflowData === null) {
console.info(`The workflow with the id "${workflowId}" does not exist.`); this.logger.info(`The workflow with the id "${workflowId}" does not exist.`);
process.exit(1); process.exit(1);
} }
} }
@ -120,34 +100,10 @@ export class Execute extends Command {
throw new Error('Failed to retrieve workflow data for requested workflow'); throw new Error('Failed to retrieve workflow data for requested workflow');
} }
// Make sure the settings exist
await UserSettings.prepareUserSettings();
// Wait till the n8n-packages have been read
await loadNodesAndCredentialsPromise;
NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes);
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
CredentialTypes(loadNodesAndCredentials);
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) { if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) {
workflowId = undefined; workflowId = undefined;
} }
try {
const startingNode = findCliWorkflowStart(workflowData.nodes); const startingNode = findCliWorkflowStart(workflowData.nodes);
const user = await getInstanceOwner(); const user = await getInstanceOwner();
@ -169,10 +125,10 @@ export class Execute extends Command {
} }
if (data.data.resultData.error) { if (data.data.resultData.error) {
console.info('Execution was NOT successful. See log message for details.'); this.logger.info('Execution was NOT successful. See log message for details.');
logger.info('Execution error:'); this.logger.info('Execution error:');
logger.info('===================================='); this.logger.info('====================================');
logger.info(JSON.stringify(data, null, 2)); this.logger.info(JSON.stringify(data, null, 2));
const { error } = data.data.resultData; const { error } = data.data.resultData;
// eslint-disable-next-line @typescript-eslint/no-throw-literal // eslint-disable-next-line @typescript-eslint/no-throw-literal
@ -186,16 +142,14 @@ export class Execute extends Command {
this.log('===================================='); this.log('====================================');
} }
this.log(JSON.stringify(data, null, 2)); this.log(JSON.stringify(data, null, 2));
} catch (e) {
console.error('Error executing workflow. See log messages for details.');
logger.error('\nExecution error:');
logger.info('====================================');
logger.error(e.message);
if (e.description) logger.error(e.description);
logger.error(e.stack);
this.exit(1);
} }
this.exit(); async catch(error: Error) {
this.logger.error('Error executing workflow. See log messages for details.');
this.logger.error('\nExecution error:');
this.logger.info('====================================');
this.logger.error(error.message);
if (error instanceof ExecutionBaseError) this.logger.error(error.description!);
this.logger.error(error.stack!);
} }
} }

View file

@ -1,45 +1,25 @@
/* eslint-disable @typescript-eslint/prefer-optional-chain */ /* eslint-disable @typescript-eslint/no-loop-func */
/* eslint-disable array-callback-return */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-async-promise-executor */
/* eslint-disable no-param-reassign */
/* eslint-disable @typescript-eslint/unbound-method */
/* eslint-disable no-console */
import fs from 'fs'; import fs from 'fs';
import { Command, flags } from '@oclif/command'; import { flags } from '@oclif/command';
import { BinaryDataManager, UserSettings } from 'n8n-core';
import type { ITaskData } from 'n8n-workflow'; import type { ITaskData } from 'n8n-workflow';
import { LoggerProxy, sleep } from 'n8n-workflow'; import { sleep } from 'n8n-workflow';
import { sep } from 'path'; import { sep } from 'path';
import { diff } from 'json-diff'; import { diff } from 'json-diff';
import pick from 'lodash.pick'; import pick from 'lodash.pick';
import { getLogger } from '@/Logger';
import * as ActiveExecutions from '@/ActiveExecutions'; import * as ActiveExecutions from '@/ActiveExecutions';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager';
import { WorkflowRunner } from '@/WorkflowRunner'; import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces'; import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
import config from '@/config';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper'; import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils'; import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events'; import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand';
const re = /\d+/; const re = /\d+/;
export class ExecuteBatch extends Command { export class ExecuteBatch extends BaseCommand {
static description = '\nExecutes multiple workflows once'; static description = '\nExecutes multiple workflows once';
static cancelled = false; static cancelled = false;
@ -115,7 +95,6 @@ export class ExecuteBatch extends Command {
* Gracefully handles exit. * Gracefully handles exit.
* @param {boolean} skipExit Whether to skip exit or number according to received signal * @param {boolean} skipExit Whether to skip exit or number according to received signal
*/ */
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
static async stopProcess(skipExit: boolean | number = false) { static async stopProcess(skipExit: boolean | number = false) {
if (ExecuteBatch.cancelled) { if (ExecuteBatch.cancelled) {
process.exit(0); process.exit(0);
@ -123,16 +102,13 @@ export class ExecuteBatch extends Command {
ExecuteBatch.cancelled = true; ExecuteBatch.cancelled = true;
const activeExecutionsInstance = ActiveExecutions.getInstance(); const activeExecutionsInstance = ActiveExecutions.getInstance();
const stopPromises = activeExecutionsInstance.getActiveExecutions().map(async (execution) => { const stopPromises = activeExecutionsInstance
// eslint-disable-next-line @typescript-eslint/no-floating-promises .getActiveExecutions()
activeExecutionsInstance.stopExecution(execution.id); .map(async (execution) => activeExecutionsInstance.stopExecution(execution.id));
});
await Promise.allSettled(stopPromises); await Promise.allSettled(stopPromises);
setTimeout(() => { setTimeout(() => process.exit(0), 30000);
process.exit(0);
}, 30000);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions(); let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
@ -144,7 +120,6 @@ export class ExecuteBatch extends Command {
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`); console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
}); });
} }
// eslint-disable-next-line no-await-in-loop
await sleep(500); await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions(); executingWorkflows = activeExecutionsInstance.getActiveExecutions();
} }
@ -155,13 +130,11 @@ export class ExecuteBatch extends Command {
} }
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types private formatJsonOutput(data: object) {
formatJsonOutput(data: object) {
return JSON.stringify(data, null, 2); return JSON.stringify(data, null, 2);
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types private shouldBeConsideredAsWarning(errorMessage: string) {
shouldBeConsideredAsWarning(errorMessage: string) {
const warningStrings = [ const warningStrings = [
'refresh token is invalid', 'refresh token is invalid',
'unable to connect to', 'unable to connect to',
@ -174,7 +147,6 @@ export class ExecuteBatch extends Command {
'status code 401', 'status code 401',
]; ];
// eslint-disable-next-line no-param-reassign
errorMessage = errorMessage.toLowerCase(); errorMessage = errorMessage.toLowerCase();
for (let i = 0; i < warningStrings.length; i++) { for (let i = 0; i < warningStrings.length; i++) {
@ -186,21 +158,23 @@ export class ExecuteBatch extends Command {
return false; return false;
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async init() {
async run() { await super.init();
process.once('SIGTERM', ExecuteBatch.stopProcess); await this.initBinaryManager();
process.once('SIGINT', ExecuteBatch.stopProcess); await this.initExternalHooks();
const logger = getLogger();
LoggerProxy.init(logger);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExecuteBatch);
// Add event handlers // Add event handlers
initEvents(); initEvents();
}
async run() {
// eslint-disable-next-line @typescript-eslint/unbound-method
process.once('SIGTERM', ExecuteBatch.stopProcess);
// eslint-disable-next-line @typescript-eslint/unbound-method
process.once('SIGINT', ExecuteBatch.stopProcess);
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExecuteBatch);
ExecuteBatch.debug = flags.debug; ExecuteBatch.debug = flags.debug;
ExecuteBatch.concurrency = flags.concurrency || 1; ExecuteBatch.concurrency = flags.concurrency || 1;
@ -277,23 +251,8 @@ export class ExecuteBatch extends Command {
ExecuteBatch.shallow = true; ExecuteBatch.shallow = true;
} }
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init();
// Make sure the settings exist
await UserSettings.prepareUserSettings();
// Wait till the database is ready
await startDbInitPromise;
ExecuteBatch.instanceOwner = await getInstanceOwner(); ExecuteBatch.instanceOwner = await getInstanceOwner();
let allWorkflows;
const query = Db.collections.Workflow.createQueryBuilder('workflows'); const query = Db.collections.Workflow.createQueryBuilder('workflows');
if (ids.length > 0) { if (ids.length > 0) {
@ -304,33 +263,12 @@ export class ExecuteBatch extends Command {
query.andWhere('workflows.id not in (:...skipIds)', { skipIds }); query.andWhere('workflows.id not in (:...skipIds)', { skipIds });
} }
// eslint-disable-next-line prefer-const const allWorkflows = (await query.getMany()) as IWorkflowDb[];
allWorkflows = (await query.getMany()) as IWorkflowDb[];
if (ExecuteBatch.debug) { if (ExecuteBatch.debug) {
process.stdout.write(`Found ${allWorkflows.length} workflows to execute.\n`); process.stdout.write(`Found ${allWorkflows.length} workflows to execute.\n`);
} }
// Wait till the n8n-packages have been read
await loadNodesAndCredentialsPromise;
NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes);
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
CredentialTypes(loadNodesAndCredentials);
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
// Send a shallow copy of allWorkflows so we still have all workflow data. // Send a shallow copy of allWorkflows so we still have all workflow data.
const results = await this.runTests([...allWorkflows]); const results = await this.runTests([...allWorkflows]);
@ -348,7 +286,6 @@ export class ExecuteBatch extends Command {
failedWorkflowIds.includes(workflow.id), failedWorkflowIds.includes(workflow.id),
); );
// eslint-disable-next-line no-await-in-loop
const retryResults = await this.runTests(newWorkflowList); const retryResults = await this.runTests(newWorkflowList);
this.mergeResults(results, retryResults); this.mergeResults(results, retryResults);
@ -389,7 +326,6 @@ export class ExecuteBatch extends Command {
this.exit(0); this.exit(0);
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
mergeResults(results: IResult, retryResults: IResult) { mergeResults(results: IResult, retryResults: IResult) {
if (retryResults.summary.successfulExecutions === 0) { if (retryResults.summary.successfulExecutions === 0) {
// Nothing to replace. // Nothing to replace.
@ -430,7 +366,7 @@ export class ExecuteBatch extends Command {
}); });
} }
async runTests(allWorkflows: IWorkflowDb[]): Promise<IResult> { private async runTests(allWorkflows: IWorkflowDb[]): Promise<IResult> {
const result: IResult = { const result: IResult = {
totalWorkflows: allWorkflows.length, totalWorkflows: allWorkflows.length,
summary: { summary: {
@ -475,7 +411,6 @@ export class ExecuteBatch extends Command {
this.updateStatus(); this.updateStatus();
} }
// eslint-disable-next-line @typescript-eslint/no-loop-func
await this.startThread(workflow).then((executionResult) => { await this.startThread(workflow).then((executionResult) => {
if (ExecuteBatch.debug) { if (ExecuteBatch.debug) {
ExecuteBatch.workflowExecutionsProgress[i].pop(); ExecuteBatch.workflowExecutionsProgress[i].pop();
@ -542,7 +477,6 @@ export class ExecuteBatch extends Command {
}); });
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
updateStatus() { updateStatus() {
if (ExecuteBatch.cancelled) { if (ExecuteBatch.cancelled) {
return; return;
@ -584,7 +518,6 @@ export class ExecuteBatch extends Command {
}); });
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
initializeLogs() { initializeLogs() {
process.stdout.write('**********************************************\n'); process.stdout.write('**********************************************\n');
process.stdout.write(' n8n test workflows\n'); process.stdout.write(' n8n test workflows\n');
@ -687,12 +620,13 @@ export class ExecuteBatch extends Command {
1000; 1000;
executionResult.finished = data?.finished !== undefined; executionResult.finished = data?.finished !== undefined;
if (data.data.resultData.error) { const resultError = data.data.resultData.error;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, no-prototype-builtins if (resultError) {
executionResult.error = data.data.resultData.error.hasOwnProperty('description') // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
executionResult.error = resultError.hasOwnProperty('description')
? // @ts-ignore ? // @ts-ignore
data.data.resultData.error.description resultError.description
: data.data.resultData.error.message; : resultError.message;
if (data.data.resultData.lastNodeExecuted !== undefined) { if (data.data.resultData.lastNodeExecuted !== undefined) {
executionResult.error += ` on node ${data.data.resultData.lastNodeExecuted}`; executionResult.error += ` on node ${data.data.resultData.lastNodeExecuted}`;
} }
@ -723,33 +657,29 @@ export class ExecuteBatch extends Command {
return; return;
} }
if ( const { capResults, ignoredProperties, keepOnlyProperties } =
nodeEdgeCases[nodeName] !== undefined && nodeEdgeCases[nodeName] || {};
nodeEdgeCases[nodeName].capResults !== undefined
) { if (capResults !== undefined) {
executionDataArray.splice(nodeEdgeCases[nodeName].capResults!); executionDataArray.splice(capResults);
} }
executionDataArray.map((executionData) => { executionDataArray.map((executionData) => {
if (executionData.json === undefined) { if (executionData.json === undefined) {
return; return;
} }
if (
nodeEdgeCases[nodeName] !== undefined && if (ignoredProperties !== undefined) {
nodeEdgeCases[nodeName].ignoredProperties !== undefined ignoredProperties.forEach(
) {
nodeEdgeCases[nodeName].ignoredProperties!.forEach(
(ignoredProperty) => delete executionData.json[ignoredProperty], (ignoredProperty) => delete executionData.json[ignoredProperty],
); );
} }
let keepOnlyFields = [] as string[]; let keepOnlyFields = [] as string[];
if ( if (keepOnlyProperties !== undefined) {
nodeEdgeCases[nodeName] !== undefined && keepOnlyFields = keepOnlyProperties;
nodeEdgeCases[nodeName].keepOnlyProperties !== undefined
) {
keepOnlyFields = nodeEdgeCases[nodeName].keepOnlyProperties!;
} }
executionData.json = executionData.json =
keepOnlyFields.length > 0 keepOnlyFields.length > 0
? pick(executionData.json, keepOnlyFields) ? pick(executionData.json, keepOnlyFields)
@ -857,8 +787,7 @@ export class ExecuteBatch extends Command {
} }
} }
} catch (e) { } catch (e) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access executionResult.error = `Workflow failed to execute: ${(e as Error).message}`;
executionResult.error = `Workflow failed to execute: ${e.message}`;
executionResult.executionStatus = 'error'; executionResult.executionStatus = 'error';
} }
clearTimeout(timeoutTimer); clearTimeout(timeoutTimer);

View file

@ -1,21 +1,13 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */ import { flags } from '@oclif/command';
/* eslint-disable @typescript-eslint/restrict-plus-operands */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-console */
import { Command, flags } from '@oclif/command';
import { Credentials, UserSettings } from 'n8n-core';
import type { IDataObject } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import fs from 'fs'; import fs from 'fs';
import path from 'path'; import path from 'path';
import { getLogger } from '@/Logger'; import { Credentials, UserSettings } from 'n8n-core';
import type { IDataObject } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import type { ICredentialsDecryptedDb } from '@/Interfaces'; import type { ICredentialsDecryptedDb } from '@/Interfaces';
import { BaseCommand } from '../BaseCommand';
export class ExportCredentialsCommand extends Command { export class ExportCredentialsCommand extends BaseCommand {
static description = 'Export credentials'; static description = 'Export credentials';
static examples = [ static examples = [
@ -55,11 +47,7 @@ export class ExportCredentialsCommand extends Command {
}), }),
}; };
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async run() { async run() {
const logger = getLogger();
LoggerProxy.init(logger);
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExportCredentialsCommand); const { flags } = this.parse(ExportCredentialsCommand);
@ -70,52 +58,53 @@ export class ExportCredentialsCommand extends Command {
} }
if (!flags.all && !flags.id) { if (!flags.all && !flags.id) {
console.info('Either option "--all" or "--id" have to be set!'); this.logger.info('Either option "--all" or "--id" have to be set!');
return; return;
} }
if (flags.all && flags.id) { if (flags.all && flags.id) {
console.info('You should either use "--all" or "--id" but never both!'); this.logger.info('You should either use "--all" or "--id" but never both!');
return; return;
} }
if (flags.separate) { if (flags.separate) {
try { try {
if (!flags.output) { if (!flags.output) {
console.info('You must inform an output directory via --output when using --separate'); this.logger.info(
'You must inform an output directory via --output when using --separate',
);
return; return;
} }
if (fs.existsSync(flags.output)) { if (fs.existsSync(flags.output)) {
if (!fs.lstatSync(flags.output).isDirectory()) { if (!fs.lstatSync(flags.output).isDirectory()) {
console.info('The parameter --output must be a directory'); this.logger.info('The parameter --output must be a directory');
return; return;
} }
} else { } else {
fs.mkdirSync(flags.output, { recursive: true }); fs.mkdirSync(flags.output, { recursive: true });
} }
} catch (e) { } catch (e) {
console.error( this.logger.error(
'Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.', 'Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.',
); );
logger.error('\nFILESYSTEM ERROR'); this.logger.error('\nFILESYSTEM ERROR');
logger.info('===================================='); if (e instanceof Error) {
logger.error(e.message); this.logger.info('====================================');
logger.error(e.stack); this.logger.error(e.message);
this.exit(1); this.logger.error(e.stack!);
}
return;
} }
} else if (flags.output) { } else if (flags.output) {
if (fs.existsSync(flags.output)) { if (fs.existsSync(flags.output)) {
if (fs.lstatSync(flags.output).isDirectory()) { if (fs.lstatSync(flags.output).isDirectory()) {
console.info('The parameter --output must be a writeable file'); this.logger.info('The parameter --output must be a writeable file');
return; return;
} }
} }
} }
try {
await Db.init();
const findQuery: IDataObject = {}; const findQuery: IDataObject = {};
if (flags.id) { if (flags.id) {
findQuery.id = flags.id; findQuery.id = flags.id;
@ -146,28 +135,26 @@ export class ExportCredentialsCommand extends Command {
for (i = 0; i < credentials.length; i++) { for (i = 0; i < credentials.length; i++) {
fileContents = JSON.stringify(credentials[i], null, flags.pretty ? 2 : undefined); fileContents = JSON.stringify(credentials[i], null, flags.pretty ? 2 : undefined);
const filename = `${ const filename = `${
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion, @typescript-eslint/restrict-plus-operands
(flags.output!.endsWith(path.sep) ? flags.output! : flags.output + path.sep) + (flags.output!.endsWith(path.sep) ? flags.output! : flags.output + path.sep) +
credentials[i].id credentials[i].id
}.json`; }.json`;
fs.writeFileSync(filename, fileContents); fs.writeFileSync(filename, fileContents);
} }
console.info(`Successfully exported ${i} credentials.`); this.logger.info(`Successfully exported ${i} credentials.`);
} else { } else {
const fileContents = JSON.stringify(credentials, null, flags.pretty ? 2 : undefined); const fileContents = JSON.stringify(credentials, null, flags.pretty ? 2 : undefined);
if (flags.output) { if (flags.output) {
fs.writeFileSync(flags.output, fileContents); fs.writeFileSync(flags.output, fileContents);
console.info(`Successfully exported ${credentials.length} credentials.`); this.logger.info(`Successfully exported ${credentials.length} credentials.`);
} else { } else {
console.info(fileContents); this.logger.info(fileContents);
} }
} }
// Force exit as process won't exit using MySQL or Postgres.
process.exit(0);
} catch (error) {
console.error('Error exporting credentials. See log messages for details.');
logger.error(error.message);
this.exit(1);
} }
async catch(error: Error) {
this.logger.error('Error exporting credentials. See log messages for details.');
this.logger.error(error.message);
} }
} }

View file

@ -1,17 +1,11 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */ import { flags } from '@oclif/command';
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-console */
import { Command, flags } from '@oclif/command';
import type { IDataObject } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import fs from 'fs'; import fs from 'fs';
import path from 'path'; import path from 'path';
import { getLogger } from '@/Logger'; import type { IDataObject } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { BaseCommand } from '../BaseCommand';
export class ExportWorkflowsCommand extends Command { export class ExportWorkflowsCommand extends BaseCommand {
static description = 'Export workflows'; static description = 'Export workflows';
static examples = [ static examples = [
@ -46,11 +40,7 @@ export class ExportWorkflowsCommand extends Command {
}), }),
}; };
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async run() { async run() {
const logger = getLogger();
LoggerProxy.init(logger);
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExportWorkflowsCommand); const { flags } = this.parse(ExportWorkflowsCommand);
@ -61,52 +51,53 @@ export class ExportWorkflowsCommand extends Command {
} }
if (!flags.all && !flags.id) { if (!flags.all && !flags.id) {
console.info('Either option "--all" or "--id" have to be set!'); this.logger.info('Either option "--all" or "--id" have to be set!');
return; return;
} }
if (flags.all && flags.id) { if (flags.all && flags.id) {
console.info('You should either use "--all" or "--id" but never both!'); this.logger.info('You should either use "--all" or "--id" but never both!');
return; return;
} }
if (flags.separate) { if (flags.separate) {
try { try {
if (!flags.output) { if (!flags.output) {
console.info('You must inform an output directory via --output when using --separate'); this.logger.info(
'You must inform an output directory via --output when using --separate',
);
return; return;
} }
if (fs.existsSync(flags.output)) { if (fs.existsSync(flags.output)) {
if (!fs.lstatSync(flags.output).isDirectory()) { if (!fs.lstatSync(flags.output).isDirectory()) {
console.info('The parameter --output must be a directory'); this.logger.info('The parameter --output must be a directory');
return; return;
} }
} else { } else {
fs.mkdirSync(flags.output, { recursive: true }); fs.mkdirSync(flags.output, { recursive: true });
} }
} catch (e) { } catch (e) {
console.error( this.logger.error(
'Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.', 'Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.',
); );
logger.error('\nFILESYSTEM ERROR'); this.logger.error('\nFILESYSTEM ERROR');
logger.info('===================================='); this.logger.info('====================================');
logger.error(e.message); if (e instanceof Error) {
logger.error(e.stack); this.logger.error(e.message);
this.logger.error(e.stack!);
}
this.exit(1); this.exit(1);
} }
} else if (flags.output) { } else if (flags.output) {
if (fs.existsSync(flags.output)) { if (fs.existsSync(flags.output)) {
if (fs.lstatSync(flags.output).isDirectory()) { if (fs.lstatSync(flags.output).isDirectory()) {
console.info('The parameter --output must be a writeable file'); this.logger.info('The parameter --output must be a writeable file');
return; return;
} }
} }
} }
try {
await Db.init();
const findQuery: IDataObject = {}; const findQuery: IDataObject = {};
if (flags.id) { if (flags.id) {
findQuery.id = flags.id; findQuery.id = flags.id;
@ -133,26 +124,24 @@ export class ExportWorkflowsCommand extends Command {
}.json`; }.json`;
fs.writeFileSync(filename, fileContents); fs.writeFileSync(filename, fileContents);
} }
console.info(`Successfully exported ${i} workflows.`); this.logger.info(`Successfully exported ${i} workflows.`);
} else { } else {
const fileContents = JSON.stringify(workflows, null, flags.pretty ? 2 : undefined); const fileContents = JSON.stringify(workflows, null, flags.pretty ? 2 : undefined);
if (flags.output) { if (flags.output) {
fs.writeFileSync(flags.output, fileContents); fs.writeFileSync(flags.output, fileContents);
console.info( this.logger.info(
`Successfully exported ${workflows.length} ${ `Successfully exported ${workflows.length} ${
workflows.length === 1 ? 'workflow.' : 'workflows.' workflows.length === 1 ? 'workflow.' : 'workflows.'
}`, }`,
); );
} else { } else {
console.info(fileContents); this.logger.info(fileContents);
} }
} }
// Force exit as process won't exit using MySQL or Postgres.
process.exit(0);
} catch (error) {
console.error('Error exporting workflows. See log messages for details.');
logger.error(error.message);
this.exit(1);
} }
async catch(error: Error) {
this.logger.error('Error exporting workflows. See log messages for details.');
this.logger.error(error.message);
} }
} }

View file

@ -1,18 +1,8 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */ import { flags } from '@oclif/command';
/* eslint-disable no-restricted-syntax */ import { Credentials } from 'n8n-core';
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable no-await-in-loop */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-console */
import { Command, flags } from '@oclif/command';
import { Credentials, UserSettings } from 'n8n-core';
import { LoggerProxy } from 'n8n-workflow';
import fs from 'fs'; import fs from 'fs';
import glob from 'fast-glob'; import glob from 'fast-glob';
import type { EntityManager } from 'typeorm'; import type { EntityManager } from 'typeorm';
import { getLogger } from '@/Logger';
import config from '@/config'; import config from '@/config';
import * as Db from '@/Db'; import * as Db from '@/Db';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
@ -20,11 +10,11 @@ import { SharedCredentials } from '@db/entities/SharedCredentials';
import type { Role } from '@db/entities/Role'; import type { Role } from '@db/entities/Role';
import { CredentialsEntity } from '@db/entities/CredentialsEntity'; import { CredentialsEntity } from '@db/entities/CredentialsEntity';
import { disableAutoGeneratedIds } from '@db/utils/commandHelpers'; import { disableAutoGeneratedIds } from '@db/utils/commandHelpers';
import { BaseCommand, UM_FIX_INSTRUCTION } from '../BaseCommand';
import type { ICredentialsEncrypted } from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
const FIX_INSTRUCTION = export class ImportCredentialsCommand extends BaseCommand {
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';
export class ImportCredentialsCommand extends Command {
static description = 'Import credentials'; static description = 'Import credentials';
static examples = [ static examples = [
@ -48,25 +38,28 @@ export class ImportCredentialsCommand extends Command {
}), }),
}; };
ownerCredentialRole: Role; private ownerCredentialRole: Role;
transactionManager: EntityManager; private transactionManager: EntityManager;
async init() {
disableAutoGeneratedIds(CredentialsEntity);
await super.init();
}
async run(): Promise<void> { async run(): Promise<void> {
const logger = getLogger(); // eslint-disable-next-line @typescript-eslint/no-shadow
LoggerProxy.init(logger);
const { flags } = this.parse(ImportCredentialsCommand); const { flags } = this.parse(ImportCredentialsCommand);
if (!flags.input) { if (!flags.input) {
console.info('An input file or directory with --input must be provided'); this.logger.info('An input file or directory with --input must be provided');
return; return;
} }
if (flags.separate) { if (flags.separate) {
if (fs.existsSync(flags.input)) { if (fs.existsSync(flags.input)) {
if (!fs.lstatSync(flags.input).isDirectory()) { if (!fs.lstatSync(flags.input).isDirectory()) {
console.info('The argument to --input must be a directory'); this.logger.info('The argument to --input must be a directory');
return; return;
} }
} }
@ -74,17 +67,10 @@ export class ImportCredentialsCommand extends Command {
let totalImported = 0; let totalImported = 0;
try {
disableAutoGeneratedIds(CredentialsEntity);
await Db.init();
await this.initOwnerCredentialRole(); await this.initOwnerCredentialRole();
const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner(); const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner();
// Make sure the settings exist const encryptionKey = this.userSettings.encryptionKey;
await UserSettings.prepareUserSettings();
const encryptionKey = await UserSettings.getEncryptionKey();
if (flags.separate) { if (flags.separate) {
let { input: inputPath } = flags; let { input: inputPath } = flags;
@ -103,7 +89,9 @@ export class ImportCredentialsCommand extends Command {
await Db.getConnection().transaction(async (transactionManager) => { await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
for (const file of files) { for (const file of files) {
const credential = JSON.parse(fs.readFileSync(file, { encoding: 'utf8' })); const credential = jsonParse<ICredentialsEncrypted>(
fs.readFileSync(file, { encoding: 'utf8' }),
);
if (typeof credential.data === 'object') { if (typeof credential.data === 'object') {
// plain data / decrypted input. Should be encrypted first. // plain data / decrypted input. Should be encrypted first.
@ -115,10 +103,12 @@ export class ImportCredentialsCommand extends Command {
}); });
this.reportSuccess(totalImported); this.reportSuccess(totalImported);
process.exit(); return;
} }
const credentials = JSON.parse(fs.readFileSync(flags.input, { encoding: 'utf8' })); const credentials = jsonParse<ICredentialsEncrypted[]>(
fs.readFileSync(flags.input, { encoding: 'utf8' }),
);
totalImported = credentials.length; totalImported = credentials.length;
@ -140,16 +130,19 @@ export class ImportCredentialsCommand extends Command {
}); });
this.reportSuccess(totalImported); this.reportSuccess(totalImported);
process.exit();
} catch (error) {
console.error('An error occurred while importing credentials. See log messages for details.');
if (error instanceof Error) logger.error(error.message);
this.exit(1);
} }
async catch(error: Error) {
this.logger.error(
'An error occurred while importing credentials. See log messages for details.',
);
this.logger.error(error.message);
} }
private reportSuccess(total: number) { private reportSuccess(total: number) {
console.info(`Successfully imported ${total} ${total === 1 ? 'credential.' : 'credentials.'}`); this.logger.info(
`Successfully imported ${total} ${total === 1 ? 'credential.' : 'credentials.'}`,
);
} }
private async initOwnerCredentialRole() { private async initOwnerCredentialRole() {
@ -158,7 +151,7 @@ export class ImportCredentialsCommand extends Command {
}); });
if (!ownerCredentialRole) { if (!ownerCredentialRole) {
throw new Error(`Failed to find owner credential role. ${FIX_INSTRUCTION}`); throw new Error(`Failed to find owner credential role. ${UM_FIX_INSTRUCTION}`);
} }
this.ownerCredentialRole = ownerCredentialRole; this.ownerCredentialRole = ownerCredentialRole;
@ -169,7 +162,7 @@ export class ImportCredentialsCommand extends Command {
await this.transactionManager.upsert( await this.transactionManager.upsert(
SharedCredentials, SharedCredentials,
{ {
credentialsId: result.identifiers[0].id, credentialsId: result.identifiers[0].id as string,
userId: user.id, userId: user.id,
roleId: this.ownerCredentialRole.id, roleId: this.ownerCredentialRole.id,
}, },
@ -192,7 +185,7 @@ export class ImportCredentialsCommand extends Command {
(await Db.collections.User.findOneBy({ globalRoleId: ownerGlobalRole.id })); (await Db.collections.User.findOneBy({ globalRoleId: ownerGlobalRole.id }));
if (!owner) { if (!owner) {
throw new Error(`Failed to find owner. ${FIX_INSTRUCTION}`); throw new Error(`Failed to find owner. ${UM_FIX_INSTRUCTION}`);
} }
return owner; return owner;

View file

@ -1,23 +1,10 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */ import { flags } from '@oclif/command';
/* eslint-disable no-restricted-syntax */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-loop-func */
/* eslint-disable no-await-in-loop */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable no-console */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Command, flags } from '@oclif/command';
import type { INode, INodeCredentialsDetails } from 'n8n-workflow'; import type { INode, INodeCredentialsDetails } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow'; import { jsonParse } from 'n8n-workflow';
import fs from 'fs'; import fs from 'fs';
import glob from 'fast-glob'; import glob from 'fast-glob';
import { UserSettings } from 'n8n-core';
import type { EntityManager } from 'typeorm'; import type { EntityManager } from 'typeorm';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
import { getLogger } from '@/Logger';
import config from '@/config'; import config from '@/config';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { SharedWorkflow } from '@db/entities/SharedWorkflow'; import { SharedWorkflow } from '@db/entities/SharedWorkflow';
@ -28,9 +15,7 @@ import { setTagsForImport } from '@/TagHelpers';
import type { ICredentialsDb, IWorkflowToImport } from '@/Interfaces'; import type { ICredentialsDb, IWorkflowToImport } from '@/Interfaces';
import { disableAutoGeneratedIds } from '@db/utils/commandHelpers'; import { disableAutoGeneratedIds } from '@db/utils/commandHelpers';
import { replaceInvalidCredentials } from '@/WorkflowHelpers'; import { replaceInvalidCredentials } from '@/WorkflowHelpers';
import { BaseCommand, UM_FIX_INSTRUCTION } from '../BaseCommand';
const FIX_INSTRUCTION =
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';
function assertHasWorkflowsToImport(workflows: unknown): asserts workflows is IWorkflowToImport[] { function assertHasWorkflowsToImport(workflows: unknown): asserts workflows is IWorkflowToImport[] {
if (!Array.isArray(workflows)) { if (!Array.isArray(workflows)) {
@ -50,7 +35,7 @@ function assertHasWorkflowsToImport(workflows: unknown): asserts workflows is IW
} }
} }
export class ImportWorkflowsCommand extends Command { export class ImportWorkflowsCommand extends BaseCommand {
static description = 'Import workflows'; static description = 'Import workflows';
static examples = [ static examples = [
@ -74,40 +59,36 @@ export class ImportWorkflowsCommand extends Command {
}), }),
}; };
ownerWorkflowRole: Role; private ownerWorkflowRole: Role;
transactionManager: EntityManager; private transactionManager: EntityManager;
async init() {
disableAutoGeneratedIds(WorkflowEntity);
await super.init();
}
async run(): Promise<void> { async run(): Promise<void> {
const logger = getLogger(); // eslint-disable-next-line @typescript-eslint/no-shadow
LoggerProxy.init(logger);
const { flags } = this.parse(ImportWorkflowsCommand); const { flags } = this.parse(ImportWorkflowsCommand);
if (!flags.input) { if (!flags.input) {
console.info('An input file or directory with --input must be provided'); this.logger.info('An input file or directory with --input must be provided');
return; return;
} }
if (flags.separate) { if (flags.separate) {
if (fs.existsSync(flags.input)) { if (fs.existsSync(flags.input)) {
if (!fs.lstatSync(flags.input).isDirectory()) { if (!fs.lstatSync(flags.input).isDirectory()) {
console.info('The argument to --input must be a directory'); this.logger.info('The argument to --input must be a directory');
return; return;
} }
} }
} }
try {
disableAutoGeneratedIds(WorkflowEntity);
await Db.init();
await this.initOwnerWorkflowRole(); await this.initOwnerWorkflowRole();
const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner(); const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner();
// Make sure the settings exist
await UserSettings.prepareUserSettings();
const credentials = await Db.collections.Credentials.find(); const credentials = await Db.collections.Credentials.find();
const tags = await Db.collections.Tag.find(); const tags = await Db.collections.Tag.find();
@ -126,12 +107,14 @@ export class ImportWorkflowsCommand extends Command {
}); });
totalImported = files.length; totalImported = files.length;
console.info(`Importing ${totalImported} workflows...`); this.logger.info(`Importing ${totalImported} workflows...`);
await Db.getConnection().transaction(async (transactionManager) => { await Db.getConnection().transaction(async (transactionManager) => {
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
for (const file of files) { for (const file of files) {
const workflow = JSON.parse(fs.readFileSync(file, { encoding: 'utf8' })); const workflow = jsonParse<IWorkflowToImport>(
fs.readFileSync(file, { encoding: 'utf8' }),
);
if (credentials.length > 0) { if (credentials.length > 0) {
workflow.nodes.forEach((node: INode) => { workflow.nodes.forEach((node: INode) => {
@ -156,7 +139,9 @@ export class ImportWorkflowsCommand extends Command {
process.exit(); process.exit();
} }
const workflows = JSON.parse(fs.readFileSync(flags.input, { encoding: 'utf8' })); const workflows = jsonParse<IWorkflowToImport[]>(
fs.readFileSync(flags.input, { encoding: 'utf8' }),
);
assertHasWorkflowsToImport(workflows); assertHasWorkflowsToImport(workflows);
@ -183,7 +168,7 @@ export class ImportWorkflowsCommand extends Command {
try { try {
await replaceInvalidCredentials(workflow as unknown as WorkflowEntity); await replaceInvalidCredentials(workflow as unknown as WorkflowEntity);
} catch (error) { } catch (error) {
console.log(error); this.logger.error('Failed to replace invalid credential', error as Error);
} }
} }
if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) { if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
@ -195,16 +180,15 @@ export class ImportWorkflowsCommand extends Command {
}); });
this.reportSuccess(totalImported); this.reportSuccess(totalImported);
process.exit();
} catch (error) {
console.error('An error occurred while importing workflows. See log messages for details.');
if (error instanceof Error) logger.error(error.message);
this.exit(1);
} }
async catch(error: Error) {
this.logger.error('An error occurred while importing workflows. See log messages for details.');
this.logger.error(error.message);
} }
private reportSuccess(total: number) { private reportSuccess(total: number) {
console.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`); this.logger.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`);
} }
private async initOwnerWorkflowRole() { private async initOwnerWorkflowRole() {
@ -213,7 +197,7 @@ export class ImportWorkflowsCommand extends Command {
}); });
if (!ownerWorkflowRole) { if (!ownerWorkflowRole) {
throw new Error(`Failed to find owner workflow role. ${FIX_INSTRUCTION}`); throw new Error(`Failed to find owner workflow role. ${UM_FIX_INSTRUCTION}`);
} }
this.ownerWorkflowRole = ownerWorkflowRole; this.ownerWorkflowRole = ownerWorkflowRole;
@ -224,7 +208,7 @@ export class ImportWorkflowsCommand extends Command {
await this.transactionManager.upsert( await this.transactionManager.upsert(
SharedWorkflow, SharedWorkflow,
{ {
workflowId: result.identifiers[0].id, workflowId: result.identifiers[0].id as string,
userId: user.id, userId: user.id,
roleId: this.ownerWorkflowRole.id, roleId: this.ownerWorkflowRole.id,
}, },
@ -247,7 +231,7 @@ export class ImportWorkflowsCommand extends Command {
(await Db.collections.User.findOneBy({ globalRoleId: ownerGlobalRole?.id })); (await Db.collections.User.findOneBy({ globalRoleId: ownerGlobalRole?.id }));
if (!owner) { if (!owner) {
throw new Error(`Failed to find owner. ${FIX_INSTRUCTION}`); throw new Error(`Failed to find owner. ${UM_FIX_INSTRUCTION}`);
} }
return owner; return owner;

View file

@ -7,14 +7,16 @@ export class Reset extends BaseCommand {
static description = '\nResets the database to the default ldap state'; static description = '\nResets the database to the default ldap state';
async run(): Promise<void> { async run(): Promise<void> {
const ldapIdentities = await Db.collections.AuthIdentity.find({ // eslint-disable-next-line @typescript-eslint/naming-convention
const { AuthIdentity, AuthProviderSyncHistory, Settings, User } = Db.collections;
const ldapIdentities = await AuthIdentity.find({
where: { providerType: 'ldap' }, where: { providerType: 'ldap' },
select: ['userId'], select: ['userId'],
}); });
await Db.collections.AuthProviderSyncHistory.delete({ providerType: 'ldap' }); await AuthProviderSyncHistory.delete({ providerType: 'ldap' });
await Db.collections.AuthIdentity.delete({ providerType: 'ldap' }); await AuthIdentity.delete({ providerType: 'ldap' });
await Db.collections.User.delete({ id: In(ldapIdentities.map((i) => i.userId)) }); await User.delete({ id: In(ldapIdentities.map((i) => i.userId)) });
await Db.collections.Settings.delete({ key: LDAP_FEATURE_NAME }); await Settings.delete({ key: LDAP_FEATURE_NAME });
this.logger.info('Successfully reset the database to default ldap state.'); this.logger.info('Successfully reset the database to default ldap state.');
} }
@ -22,6 +24,5 @@ export class Reset extends BaseCommand {
async catch(error: Error): Promise<void> { async catch(error: Error): Promise<void> {
this.logger.error('Error resetting database. See log messages for details.'); this.logger.error('Error resetting database. See log messages for details.');
this.logger.error(error.message); this.logger.error(error.message);
this.exit(1);
} }
} }

View file

@ -1,42 +1,25 @@
import { Command } from '@oclif/command';
import { LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { getLogger } from '@/Logger';
import { SETTINGS_LICENSE_CERT_KEY } from '@/constants'; import { SETTINGS_LICENSE_CERT_KEY } from '@/constants';
import { BaseCommand } from '../BaseCommand';
export class ClearLicenseCommand extends Command { export class ClearLicenseCommand extends BaseCommand {
static description = 'Clear license'; static description = 'Clear license';
static examples = ['$ n8n clear:license']; static examples = ['$ n8n clear:license'];
async run() { async run() {
const logger = getLogger(); this.logger.info('Clearing license from database.');
LoggerProxy.init(logger);
try {
await Db.init();
console.info('Clearing license from database.');
await Db.collections.Settings.delete({ await Db.collections.Settings.delete({
key: SETTINGS_LICENSE_CERT_KEY, key: SETTINGS_LICENSE_CERT_KEY,
}); });
console.info('Done. Restart n8n to take effect.'); this.logger.info('Done. Restart n8n to take effect.');
} catch (e: unknown) {
console.error('Error updating database. See log messages for details.');
logger.error('\nGOT ERROR');
logger.info('====================================');
if (e instanceof Error) {
logger.error(e.message);
if (e.stack) {
logger.error(e.stack);
}
}
this.exit(1);
} }
this.exit(); async catch(error: Error) {
this.logger.error('Error updating database. See log messages for details.');
this.logger.error('\nGOT ERROR');
this.logger.info('====================================');
this.logger.error(error.message);
this.logger.error(error.stack!);
} }
} }

View file

@ -1,12 +1,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ import { flags } from '@oclif/command';
/* eslint-disable no-console */
import { Command, flags } from '@oclif/command';
import type { IDataObject } from 'n8n-workflow'; import type { IDataObject } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { BaseCommand } from '../BaseCommand';
export class ListWorkflowCommand extends Command { export class ListWorkflowCommand extends BaseCommand {
static description = '\nList workflows'; static description = '\nList workflows';
static examples = [ static examples = [
@ -25,7 +22,6 @@ export class ListWorkflowCommand extends Command {
}), }),
}; };
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async run() { async run() {
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ListWorkflowCommand); const { flags } = this.parse(ListWorkflowCommand);
@ -34,9 +30,6 @@ export class ListWorkflowCommand extends Command {
this.error('The --active flag has to be passed using true or false'); this.error('The --active flag has to be passed using true or false');
} }
try {
await Db.init();
const findQuery: IDataObject = {}; const findQuery: IDataObject = {};
if (flags.active !== undefined) { if (flags.active !== undefined) {
findQuery.active = flags.active === 'true'; findQuery.active = flags.active === 'true';
@ -44,18 +37,16 @@ export class ListWorkflowCommand extends Command {
const workflows = await Db.collections.Workflow.find(findQuery); const workflows = await Db.collections.Workflow.find(findQuery);
if (flags.onlyId) { if (flags.onlyId) {
workflows.forEach((workflow) => console.log(workflow.id)); workflows.forEach((workflow) => this.logger.info(workflow.id));
} else { } else {
workflows.forEach((workflow) => console.log(`${workflow.id}|${workflow.name}`)); workflows.forEach((workflow) => this.logger.info(`${workflow.id}|${workflow.name}`));
} }
} catch (e) {
console.error('\nGOT ERROR');
console.log('====================================');
console.error(e.message);
console.error(e.stack);
this.exit(1);
} }
this.exit(); async catch(error: Error) {
this.logger.error('\nGOT ERROR');
this.logger.error('====================================');
this.logger.error(error.message);
this.logger.error(error.stack!);
} }
} }

View file

@ -6,39 +6,31 @@ import path from 'path';
import { mkdir } from 'fs/promises'; import { mkdir } from 'fs/promises';
import { createReadStream, createWriteStream, existsSync } from 'fs'; import { createReadStream, createWriteStream, existsSync } from 'fs';
import localtunnel from 'localtunnel'; import localtunnel from 'localtunnel';
import { BinaryDataManager, TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core'; import { TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core';
import { Command, flags } from '@oclif/command'; import { flags } from '@oclif/command';
import stream from 'stream'; import stream from 'stream';
import replaceStream from 'replacestream'; import replaceStream from 'replacestream';
import { promisify } from 'util'; import { promisify } from 'util';
import glob from 'fast-glob'; import glob from 'fast-glob';
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow'; import { LoggerProxy, sleep, jsonParse } from 'n8n-workflow';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import config from '@/config'; import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions'; import * as ActiveExecutions from '@/ActiveExecutions';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner'; import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers'; import * as GenericHelpers from '@/GenericHelpers';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager'; import { InternalHooksManager } from '@/InternalHooksManager';
import * as Server from '@/Server'; import * as Server from '@/Server';
import * as TestWebhooks from '@/TestWebhooks'; import * as TestWebhooks from '@/TestWebhooks';
import { WaitTracker } from '@/WaitTracker'; import { WaitTracker } from '@/WaitTracker';
import { getLogger } from '@/Logger';
import { getAllInstalledPackages } from '@/CommunityNodes/packageModel'; import { getAllInstalledPackages } from '@/CommunityNodes/packageModel';
import { handleLdapInit } from '@/Ldap/helpers'; import { handleLdapInit } from '@/Ldap/helpers';
import { initErrorHandling } from '@/ErrorReporting';
import * as CrashJournal from '@/CrashJournal';
import { createPostHogLoadingScript } from '@/telemetry/scripts'; import { createPostHogLoadingScript } from '@/telemetry/scripts';
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants'; import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
import { eventBus } from '../eventbus'; import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open'); const open = require('open');
@ -46,21 +38,7 @@ const pipeline = promisify(stream.pipeline);
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined; let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
const exitWithCrash = async (message: string, error: unknown) => { export class Start extends BaseCommand {
ErrorReporter.error(new Error(message, { cause: error }), { level: 'fatal' });
await sleep(2000);
process.exit(1);
};
const exitSuccessFully = async () => {
try {
await CrashJournal.cleanup();
} finally {
process.exit();
}
};
export class Start extends Command {
static description = 'Starts n8n. Makes Web-UI available and starts active workflows'; static description = 'Starts n8n. Makes Web-UI available and starts active workflows';
static examples = [ static examples = [
@ -89,8 +67,7 @@ export class Start extends Command {
/** /**
* Opens the UI in browser * Opens the UI in browser
*/ */
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types private openBrowser() {
static openBrowser() {
const editorUrl = GenericHelpers.getBaseUrl(); const editorUrl = GenericHelpers.getBaseUrl();
// eslint-disable-next-line @typescript-eslint/no-unused-vars // eslint-disable-next-line @typescript-eslint/no-unused-vars
@ -106,22 +83,20 @@ export class Start extends Command {
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
* get removed. * get removed.
*/ */
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async stopProcess() {
static async stopProcess() { this.logger.info('\nStopping n8n...');
getLogger().info('\nStopping n8n...');
try { try {
// Stop with trying to activate workflows that could not be activated // Stop with trying to activate workflows that could not be activated
activeWorkflowRunner?.removeAllQueuedWorkflowActivations(); activeWorkflowRunner?.removeAllQueuedWorkflowActivations();
const externalHooks = ExternalHooks(); await this.externalHooks.run('n8n.stop', []);
await externalHooks.run('n8n.stop', []);
setTimeout(async () => { setTimeout(async () => {
// In case that something goes wrong with shutdown we // In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what // kill after max. 30 seconds no matter what
console.log('process exited after 30s'); console.log('process exited after 30s');
await exitSuccessFully(); await this.exitSuccessFully();
}, 30000); }, 30000);
await InternalHooksManager.getInstance().onN8nStop(); await InternalHooksManager.getInstance().onN8nStop();
@ -162,13 +137,13 @@ export class Start extends Command {
//finally shut down Event Bus //finally shut down Event Bus
await eventBus.close(); await eventBus.close();
} catch (error) { } catch (error) {
await exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('There was an error shutting down n8n.', error);
} }
await exitSuccessFully(); await this.exitSuccessFully();
} }
static async generateStaticAssets() { private async generateStaticAssets() {
// Read the index file and replace the path placeholder // Read the index file and replace the path placeholder
const n8nPath = config.getEnv('path'); const n8nPath = config.getEnv('path');
const hooksUrls = config.getEnv('externalFrontendHooksUrls'); const hooksUrls = config.getEnv('externalFrontendHooksUrls');
@ -223,38 +198,27 @@ export class Start extends Command {
await Promise.all(files.map(compileFile)); await Promise.all(files.map(compileFile));
} }
async run() { async init() {
// Make sure that n8n shuts down gracefully if possible // Make sure that n8n shuts down gracefully if possible
process.once('SIGTERM', Start.stopProcess); process.once('SIGTERM', this.stopProcess);
process.once('SIGINT', Start.stopProcess); process.once('SIGINT', this.stopProcess);
const logger = getLogger(); await this.initCrashJournal();
LoggerProxy.init(logger); await super.init();
logger.info('Initializing n8n process'); this.logger.info('Initializing n8n process');
await initErrorHandling(); await this.initBinaryManager();
await CrashJournal.init(); await this.initExternalHooks();
if (!config.getEnv('endpoints.disableUi')) {
await this.generateStaticAssets();
}
}
async run() {
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Start); const { flags } = this.parse(Start);
try {
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Start directly with the init of the database to improve startup time
await Db.init().catch(async (error: Error) =>
exitWithCrash('There was an error initializing DB', error),
);
// Make sure the settings exist
const userSettings = await UserSettings.prepareUserSettings();
if (!config.getEnv('userManagement.jwtSecret')) { if (!config.getEnv('userManagement.jwtSecret')) {
// If we don't have a JWT secret set, generate // If we don't have a JWT secret set, generate
// one based and save to config. // one based and save to config.
@ -269,18 +233,7 @@ export class Start extends Command {
config.set('userManagement.jwtSecret', createHash('sha256').update(baseKey).digest('hex')); config.set('userManagement.jwtSecret', createHash('sha256').update(baseKey).digest('hex'));
} }
if (!config.getEnv('endpoints.disableUi')) { await this.loadNodesAndCredentials.generateTypesForFrontend();
await Start.generateStaticAssets();
}
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes);
await loadNodesAndCredentials.generateTypesForFrontend();
const installedPackages = await getAllInstalledPackages(); const installedPackages = await getAllInstalledPackages();
const missingPackages = new Set<{ const missingPackages = new Set<{
@ -289,7 +242,7 @@ export class Start extends Command {
}>(); }>();
installedPackages.forEach((installedPackage) => { installedPackages.forEach((installedPackage) => {
installedPackage.installedNodes.forEach((installedNode) => { installedPackage.installedNodes.forEach((installedNode) => {
if (!loadNodesAndCredentials.known.nodes[installedNode.type]) { if (!this.loadNodesAndCredentials.known.nodes[installedNode.type]) {
// Leave the list ready for installing in case we need. // Leave the list ready for installing in case we need.
missingPackages.add({ missingPackages.add({
packageName: installedPackage.packageName, packageName: installedPackage.packageName,
@ -304,7 +257,7 @@ export class Start extends Command {
// Load settings from database and set them to config. // Load settings from database and set them to config.
const databaseSettings = await Db.collections.Settings.findBy({ loadOnStartup: true }); const databaseSettings = await Db.collections.Settings.findBy({ loadOnStartup: true });
databaseSettings.forEach((setting) => { databaseSettings.forEach((setting) => {
config.set(setting.key, JSON.parse(setting.value)); config.set(setting.key, jsonParse(setting.value));
}); });
config.set('nodes.packagesMissing', ''); config.set('nodes.packagesMissing', '');
@ -320,7 +273,7 @@ export class Start extends Command {
// eslint-disable-next-line no-restricted-syntax // eslint-disable-next-line no-restricted-syntax
for (const missingPackage of missingPackages) { for (const missingPackage of missingPackages) {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
void (await loadNodesAndCredentials.loadNpmModule( void (await this.loadNodesAndCredentials.loadNpmModule(
missingPackage.packageName, missingPackage.packageName,
missingPackage.version, missingPackage.version,
)); ));
@ -358,14 +311,14 @@ export class Start extends Command {
process.env[TUNNEL_SUBDOMAIN_ENV] !== '' process.env[TUNNEL_SUBDOMAIN_ENV] !== ''
) { ) {
tunnelSubdomain = process.env[TUNNEL_SUBDOMAIN_ENV]; tunnelSubdomain = process.env[TUNNEL_SUBDOMAIN_ENV];
} else if (userSettings.tunnelSubdomain !== undefined) { } else if (this.userSettings.tunnelSubdomain !== undefined) {
tunnelSubdomain = userSettings.tunnelSubdomain; tunnelSubdomain = this.userSettings.tunnelSubdomain;
} }
if (tunnelSubdomain === undefined) { if (tunnelSubdomain === undefined) {
// When no tunnel subdomain did exist yet create a new random one // When no tunnel subdomain did exist yet create a new random one
const availableCharacters = 'abcdefghijklmnopqrstuvwxyz0123456789'; const availableCharacters = 'abcdefghijklmnopqrstuvwxyz0123456789';
userSettings.tunnelSubdomain = Array.from({ length: 24 }) this.userSettings.tunnelSubdomain = Array.from({ length: 24 })
.map(() => { .map(() => {
return availableCharacters.charAt( return availableCharacters.charAt(
Math.floor(Math.random() * availableCharacters.length), Math.floor(Math.random() * availableCharacters.length),
@ -373,7 +326,7 @@ export class Start extends Command {
}) })
.join(''); .join('');
await UserSettings.writeUserSettings(userSettings); await UserSettings.writeUserSettings(this.userSettings);
} }
const tunnelSettings: localtunnel.TunnelConfig = { const tunnelSettings: localtunnel.TunnelConfig = {
@ -393,12 +346,6 @@ export class Start extends Command {
); );
} }
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
await Server.start(); await Server.start();
// Start to get active workflows and run their triggers // Start to get active workflows and run their triggers
@ -423,39 +370,34 @@ export class Start extends Command {
process.stdin.setRawMode(true); process.stdin.setRawMode(true);
process.stdin.resume(); process.stdin.resume();
process.stdin.setEncoding('utf8'); process.stdin.setEncoding('utf8');
// eslint-disable-next-line @typescript-eslint/no-unused-vars
let inputText = '';
if (flags.open) { if (flags.open) {
Start.openBrowser(); this.openBrowser();
} }
this.log('\nPress "o" to open in Browser.'); this.log('\nPress "o" to open in Browser.');
process.stdin.on('data', (key: string) => { process.stdin.on('data', (key: string) => {
if (key === 'o') { if (key === 'o') {
Start.openBrowser(); this.openBrowser();
inputText = '';
} else if (key.charCodeAt(0) === 3) { } else if (key.charCodeAt(0) === 3) {
// Ctrl + c got pressed // Ctrl + c got pressed
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
Start.stopProcess(); this.stopProcess();
} else { } else {
// When anything else got pressed, record it and send it on enter into the child process // When anything else got pressed, record it and send it on enter into the child process
// eslint-disable-next-line no-lonely-if // eslint-disable-next-line no-lonely-if
if (key.charCodeAt(0) === 13) { if (key.charCodeAt(0) === 13) {
// send to child process and print in terminal // send to child process and print in terminal
process.stdout.write('\n'); process.stdout.write('\n');
inputText = '';
} else { } else {
// record it and write into terminal // record it and write into terminal
// eslint-disable-next-line @typescript-eslint/no-unused-vars
inputText += key;
process.stdout.write(key); process.stdout.write(key);
} }
} }
}); });
} }
} catch (error) {
await exitWithCrash('There was an error', error);
} }
async catch(error: Error) {
await this.exitWithCrash('Exiting due to an error.', error);
} }
} }

View file

@ -1,16 +1,12 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable no-console */ /* eslint-disable no-console */
import { Command, flags } from '@oclif/command'; import { flags } from '@oclif/command';
import type { IDataObject } from 'n8n-workflow'; import type { IDataObject } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { BaseCommand } from '../BaseCommand';
import { getLogger } from '@/Logger'; export class UpdateWorkflowCommand extends BaseCommand {
export class UpdateWorkflowCommand extends Command {
static description = 'Update workflows'; static description = 'Update workflows';
static examples = [ static examples = [
@ -31,11 +27,7 @@ export class UpdateWorkflowCommand extends Command {
}), }),
}; };
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async run() { async run() {
const logger = getLogger();
LoggerProxy.init(logger);
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(UpdateWorkflowCommand); const { flags } = this.parse(UpdateWorkflowCommand);
@ -56,35 +48,32 @@ export class UpdateWorkflowCommand extends Command {
console.info('No update flag like "--active=true" has been set!'); console.info('No update flag like "--active=true" has been set!');
return; return;
} }
if (!['false', 'true'].includes(flags.active)) { if (!['false', 'true'].includes(flags.active)) {
console.info('Valid values for flag "--active" are only "false" or "true"!'); console.info('Valid values for flag "--active" are only "false" or "true"!');
return; return;
} }
updateQuery.active = flags.active === 'true';
try { updateQuery.active = flags.active === 'true';
await Db.init();
const findQuery: IDataObject = {}; const findQuery: IDataObject = {};
if (flags.id) { if (flags.id) {
console.info(`Deactivating workflow with ID: ${flags.id}`); this.logger.info(`Deactivating workflow with ID: ${flags.id}`);
findQuery.id = flags.id; findQuery.id = flags.id;
} else { } else {
console.info('Deactivating all workflows'); this.logger.info('Deactivating all workflows');
findQuery.active = true; findQuery.active = true;
} }
await Db.collections.Workflow.update(findQuery, updateQuery); await Db.collections.Workflow.update(findQuery, updateQuery);
console.info('Done'); this.logger.info('Done');
} catch (e) {
console.error('Error updating database. See log messages for details.');
logger.error('\nGOT ERROR');
logger.info('====================================');
logger.error(e.message);
logger.error(e.stack);
this.exit(1);
} }
this.exit(); async catch(error: Error) {
this.logger.error('Error updating database. See log messages for details.');
this.logger.error('\nGOT ERROR');
this.logger.error('====================================');
this.logger.error(error.message);
this.logger.error(error.stack!);
} }
} }

View file

@ -1,10 +1,21 @@
import { Not } from 'typeorm'; import { Not } from 'typeorm';
import * as Db from '@/Db'; import * as Db from '@/Db';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity'; import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
import { User } from '@db/entities/User';
import { BaseCommand } from '../BaseCommand'; import { BaseCommand } from '../BaseCommand';
const defaultUserProps = {
firstName: null,
lastName: null,
email: null,
password: null,
resetPasswordToken: null,
};
export class Reset extends BaseCommand { export class Reset extends BaseCommand {
static description = '\nResets the database to the default user state'; static description = 'Resets the database to the default user state';
static examples = ['$ n8n user-management:reset'];
async run(): Promise<void> { async run(): Promise<void> {
const owner = await this.getInstanceOwner(); const owner = await this.getInstanceOwner();
@ -30,7 +41,7 @@ export class Reset extends BaseCommand {
); );
await Db.collections.User.delete({ id: Not(owner.id) }); await Db.collections.User.delete({ id: Not(owner.id) });
await Db.collections.User.save(Object.assign(owner, this.defaultUserProps)); await Db.collections.User.save(Object.assign(owner, defaultUserProps));
const danglingCredentials: CredentialsEntity[] = const danglingCredentials: CredentialsEntity[] =
(await Db.collections.Credentials.createQueryBuilder('credentials') (await Db.collections.Credentials.createQueryBuilder('credentials')
@ -58,6 +69,25 @@ export class Reset extends BaseCommand {
this.logger.info('Successfully reset the database to default user state.'); this.logger.info('Successfully reset the database to default user state.');
} }
async getInstanceOwner(): Promise<User> {
const globalRole = await Db.collections.Role.findOneByOrFail({
name: 'owner',
scope: 'global',
});
const owner = await Db.collections.User.findOneBy({ globalRoleId: globalRole.id });
if (owner) return owner;
const user = new User();
Object.assign(user, { ...defaultUserProps, globalRole });
await Db.collections.User.save(user);
return Db.collections.User.findOneByOrFail({ globalRoleId: globalRole.id });
}
async catch(error: Error): Promise<void> { async catch(error: Error): Promise<void> {
this.logger.error('Error resetting database. See log messages for details.'); this.logger.error('Error resetting database. See log messages for details.');
this.logger.error(error.message); this.logger.error(error.message);

View file

@ -1,37 +1,12 @@
/* eslint-disable @typescript-eslint/unbound-method */ /* eslint-disable @typescript-eslint/unbound-method */
import { BinaryDataManager, UserSettings } from 'n8n-core'; import { flags } from '@oclif/command';
import { Command, flags } from '@oclif/command'; import { LoggerProxy, sleep } from 'n8n-workflow';
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import config from '@/config'; import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions'; import * as ActiveExecutions from '@/ActiveExecutions';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager';
import { WebhookServer } from '@/WebhookServer'; import { WebhookServer } from '@/WebhookServer';
import { getLogger } from '@/Logger'; import { BaseCommand } from './BaseCommand';
import { initErrorHandling } from '@/ErrorReporting';
import * as CrashJournal from '@/CrashJournal';
const exitWithCrash = async (message: string, error: unknown) => { export class Webhook extends BaseCommand {
ErrorReporter.error(new Error(message, { cause: error }), { level: 'fatal' });
await sleep(2000);
process.exit(1);
};
const exitSuccessFully = async () => {
try {
await CrashJournal.cleanup();
} finally {
process.exit();
}
};
export class Webhook extends Command {
static description = 'Starts n8n webhook process. Intercepts only production URLs.'; static description = 'Starts n8n webhook process. Intercepts only production URLs.';
static examples = ['$ n8n webhook']; static examples = ['$ n8n webhook'];
@ -45,18 +20,16 @@ export class Webhook extends Command {
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
* get removed. * get removed.
*/ */
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async stopProcess() {
static async stopProcess() {
LoggerProxy.info('\nStopping n8n...'); LoggerProxy.info('\nStopping n8n...');
try { try {
const externalHooks = ExternalHooks(); await this.externalHooks.run('n8n.stop', []);
await externalHooks.run('n8n.stop', []);
setTimeout(async () => { setTimeout(async () => {
// In case that something goes wrong with shutdown we // In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what // kill after max. 30 seconds no matter what
await exitSuccessFully(); await this.exitSuccessFully();
}, 30000); }, 30000);
// Wait for active workflow executions to finish // Wait for active workflow executions to finish
@ -75,14 +48,13 @@ export class Webhook extends Command {
executingWorkflows = activeExecutionsInstance.getActiveExecutions(); executingWorkflows = activeExecutionsInstance.getActiveExecutions();
} }
} catch (error) { } catch (error) {
await exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('There was an error shutting down n8n.', error);
} }
await exitSuccessFully(); await this.exitSuccessFully();
} }
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async init() {
async run() {
if (config.getEnv('executions.mode') !== 'queue') { if (config.getEnv('executions.mode') !== 'queue') {
/** /**
* It is technically possible to run without queues but * It is technically possible to run without queues but
@ -99,56 +71,23 @@ export class Webhook extends Command {
this.error('Webhook processes can only run with execution mode as queue.'); this.error('Webhook processes can only run with execution mode as queue.');
} }
const logger = getLogger();
LoggerProxy.init(logger);
// Make sure that n8n shuts down gracefully if possible // Make sure that n8n shuts down gracefully if possible
process.once('SIGTERM', Webhook.stopProcess); process.once('SIGTERM', this.stopProcess);
process.once('SIGINT', Webhook.stopProcess); process.once('SIGINT', this.stopProcess);
await initErrorHandling(); await this.initCrashJournal();
await CrashJournal.init(); await super.init();
try { await this.initBinaryManager();
// Start directly with the init of the database to improve startup time await this.initExternalHooks();
const startDbInitPromise = Db.init().catch(async (error: Error) =>
exitWithCrash('There was an error initializing DB', error),
);
// Make sure the settings exist
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await UserSettings.prepareUserSettings();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes);
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Wait till the database is ready
await startDbInitPromise;
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
const server = new WebhookServer();
await server.start();
console.info('Webhook listener waiting for requests.');
} catch (error) {
await exitWithCrash('Exiting due to error.', error);
} }
async run() {
await new WebhookServer().start();
this.logger.info('Webhook listener waiting for requests.');
}
async catch(error: Error) {
await this.exitWithCrash('Exiting due to an error.', error);
} }
} }

View file

@ -1,54 +1,28 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/unbound-method */ /* eslint-disable @typescript-eslint/unbound-method */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import express from 'express'; import express from 'express';
import http from 'http'; import http from 'http';
import type PCancelable from 'p-cancelable'; import type PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command'; import { flags } from '@oclif/command';
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; import { WorkflowExecute } from 'n8n-core';
import type { IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; import type { IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow';
import { Workflow, LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow'; import { Workflow, NodeOperationError, LoggerProxy, sleep } from 'n8n-workflow';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import { NodeTypes } from '@/NodeTypes';
import * as ResponseHelper from '@/ResponseHelper'; import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers'; import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { InternalHooksManager } from '@/InternalHooksManager';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { getLogger } from '@/Logger';
import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import config from '@/config'; import config from '@/config';
import * as Queue from '@/Queue'; import * as Queue from '@/Queue';
import * as CrashJournal from '@/CrashJournal';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants'; import { N8N_VERSION } from '@/constants';
import { initErrorHandling } from '@/ErrorReporting'; import { BaseCommand } from './BaseCommand';
const exitWithCrash = async (message: string, error: unknown) => { export class Worker extends BaseCommand {
ErrorReporter.error(new Error(message, { cause: error }), { level: 'fatal' });
await sleep(2000);
process.exit(1);
};
const exitSuccessFully = async () => {
try {
await CrashJournal.cleanup();
} finally {
process.exit();
}
};
export class Worker extends Command {
static description = '\nStarts a n8n worker'; static description = '\nStarts a n8n worker';
static examples = ['$ n8n worker --concurrency=5']; static examples = ['$ n8n worker --concurrency=5'];
@ -72,7 +46,7 @@ export class Worker extends Command {
* Make for example sure that all the webhooks from third party services * Make for example sure that all the webhooks from third party services
* get removed. * get removed.
*/ */
static async stopProcess() { async stopProcess() {
LoggerProxy.info('Stopping n8n...'); LoggerProxy.info('Stopping n8n...');
// Stop accepting new jobs // Stop accepting new jobs
@ -80,8 +54,7 @@ export class Worker extends Command {
Worker.jobQueue.pause(true); Worker.jobQueue.pause(true);
try { try {
const externalHooks = ExternalHooks(); await this.externalHooks.run('n8n.stop', []);
await externalHooks.run('n8n.stop', []);
const maxStopTime = config.getEnv('queue.bull.gracefulShutdownTimeout') * 1000; const maxStopTime = config.getEnv('queue.bull.gracefulShutdownTimeout') * 1000;
@ -90,7 +63,7 @@ export class Worker extends Command {
setTimeout(async () => { setTimeout(async () => {
// In case that something goes wrong with shutdown we // In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what // kill after max. 30 seconds no matter what
await exitSuccessFully(); await this.exitSuccessFully();
}, maxStopTime); }, maxStopTime);
// Wait for active workflow executions to finish // Wait for active workflow executions to finish
@ -108,10 +81,10 @@ export class Worker extends Command {
await sleep(500); await sleep(500);
} }
} catch (error) { } catch (error) {
await exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('There was an error shutting down n8n.', error);
} }
await exitSuccessFully(); await this.exitSuccessFully();
} }
async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> { async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> {
@ -128,31 +101,27 @@ export class Worker extends Command {
); );
} }
const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb); const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb);
const workflowId = currentExecutionDb.workflowData.id!;
LoggerProxy.info( LoggerProxy.info(
`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${executionId})`, `Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`,
); );
const workflowOwner = await getWorkflowOwner(currentExecutionDb.workflowData.id!.toString()); const workflowOwner = await getWorkflowOwner(workflowId);
let { staticData } = currentExecutionDb.workflowData; let { staticData } = currentExecutionDb.workflowData;
if (loadStaticData) { if (loadStaticData) {
const workflowData = await Db.collections.Workflow.findOne({ const workflowData = await Db.collections.Workflow.findOne({
select: ['id', 'staticData'], select: ['id', 'staticData'],
where: { where: {
id: currentExecutionDb.workflowData.id, id: workflowId,
}, },
}); });
if (workflowData === null) { if (workflowData === null) {
LoggerProxy.error( LoggerProxy.error(
'Worker execution failed because workflow could not be found in database.', 'Worker execution failed because workflow could not be found in database.',
{ { workflowId, executionId },
workflowId: currentExecutionDb.workflowData.id,
executionId,
},
);
throw new Error(
`The workflow with the ID "${currentExecutionDb.workflowData.id}" could not be found`,
); );
throw new Error(`The workflow with the ID "${workflowId}" could not be found`);
} }
staticData = workflowData.staticData; staticData = workflowData.staticData;
} }
@ -173,7 +142,7 @@ export class Worker extends Command {
} }
const workflow = new Workflow({ const workflow = new Workflow({
id: currentExecutionDb.workflowData.id as string, id: workflowId,
name: currentExecutionDb.workflowData.name, name: currentExecutionDb.workflowData.name,
nodes: currentExecutionDb.workflowData.nodes, nodes: currentExecutionDb.workflowData.nodes,
connections: currentExecutionDb.workflowData.connections, connections: currentExecutionDb.workflowData.connections,
@ -198,15 +167,15 @@ export class Worker extends Command {
try { try {
await PermissionChecker.check(workflow, workflowOwner.id); await PermissionChecker.check(workflow, workflowOwner.id);
} catch (error) { } catch (error) {
if (error instanceof NodeOperationError) {
const failedExecution = generateFailedExecutionFromError( const failedExecution = generateFailedExecutionFromError(
currentExecutionDb.mode, currentExecutionDb.mode,
error, error,
error.node, error.node,
); );
await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]); await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]);
return { }
success: true, return { success: true };
};
} }
additionalData.hooks.hookFunctions.sendResponse = [ additionalData.hooks.hookFunctions.sendResponse = [
@ -249,72 +218,37 @@ export class Worker extends Command {
}; };
} }
async run() { async init() {
const logger = getLogger();
LoggerProxy.init(logger);
// eslint-disable-next-line no-console
console.info('Starting n8n worker...');
// Make sure that n8n shuts down gracefully if possible // Make sure that n8n shuts down gracefully if possible
process.once('SIGTERM', Worker.stopProcess); process.once('SIGTERM', this.stopProcess);
process.once('SIGINT', Worker.stopProcess); process.once('SIGINT', this.stopProcess);
await initErrorHandling(); await this.initCrashJournal();
await CrashJournal.init(); await super.init();
this.logger.debug('Starting n8n worker...');
// Wrap that the process does not close but we can still use async await this.initBinaryManager();
await (async () => { await this.initExternalHooks();
try { }
async run() {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Worker); const { flags } = this.parse(Worker);
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch(async (error: Error) =>
exitWithCrash('There was an error initializing DB', error),
);
// Make sure the settings exist
await UserSettings.prepareUserSettings();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes);
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Wait till the database is ready
await startDbInitPromise;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
const queue = await Queue.getInstance(); const queue = await Queue.getInstance();
Worker.jobQueue = queue.getBullObjectInstance(); Worker.jobQueue = queue.getBullObjectInstance();
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, nodeTypes)); Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, this.nodeTypes));
const instanceId = await UserSettings.getInstanceId(); this.logger.info('\nn8n worker is now ready');
this.logger.info(` * Version: ${N8N_VERSION}`);
this.logger.info(` * Concurrency: ${flags.concurrency}`);
this.logger.info('');
await InternalHooksManager.init(instanceId, nodeTypes); Worker.jobQueue.on('global:progress', (jobId: Queue.JobId, progress) => {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
console.info('\nn8n worker is now ready');
console.info(` * Version: ${N8N_VERSION}`);
console.info(` * Concurrency: ${flags.concurrency}`);
console.info('');
Worker.jobQueue.on('global:progress', (jobId, progress) => {
// Progress of a job got updated which does get used // Progress of a job got updated which does get used
// to communicate that a job got canceled. // to communicate that a job got canceled.
@ -341,21 +275,21 @@ export class Worker extends Command {
cumulativeTimeout += now - lastTimer; cumulativeTimeout += now - lastTimer;
lastTimer = now; lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) { if (cumulativeTimeout > redisConnectionTimeoutLimit) {
logger.error( this.logger.error(
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
); );
process.exit(1); process.exit(1);
} }
} }
logger.warn('Redis unavailable - trying to reconnect...'); this.logger.warn('Redis unavailable - trying to reconnect...');
} else if (error.toString().includes('Error initializing Lua scripts')) { } else if (error.toString().includes('Error initializing Lua scripts')) {
// This is a non-recoverable error // This is a non-recoverable error
// Happens when worker starts and Redis is unavailable // Happens when worker starts and Redis is unavailable
// Even if Redis comes back online, worker will be zombie // Even if Redis comes back online, worker will be zombie
logger.error('Error initializing worker.'); this.logger.error('Error initializing worker.');
process.exit(2); process.exit(2);
} else { } else {
logger.error('Error from queue: ', error); this.logger.error('Error from queue: ', error);
throw error; throw error;
} }
}); });
@ -384,7 +318,7 @@ export class Worker extends Command {
// DB ping // DB ping
await connection.query('SELECT 1'); await connection.query('SELECT 1');
} catch (e) { } catch (e) {
LoggerProxy.error('No Database connection!', e); LoggerProxy.error('No Database connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Database connection!'); const error = new ResponseHelper.ServiceUnavailableError('No Database connection!');
return ResponseHelper.sendErrorResponse(res, error); return ResponseHelper.sendErrorResponse(res, error);
} }
@ -395,7 +329,7 @@ export class Worker extends Command {
// Redis ping // Redis ping
await Worker.jobQueue.client.ping(); await Worker.jobQueue.client.ping();
} catch (e) { } catch (e) {
LoggerProxy.error('No Redis connection!', e); LoggerProxy.error('No Redis connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
return ResponseHelper.sendErrorResponse(res, error); return ResponseHelper.sendErrorResponse(res, error);
} }
@ -412,21 +346,21 @@ export class Worker extends Command {
); );
server.listen(port, () => { server.listen(port, () => {
console.info(`\nn8n worker health check via, port ${port}`); this.logger.info(`\nn8n worker health check via, port ${port}`);
}); });
server.on('error', (error: Error & { code: string }) => { server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') { if (error.code === 'EADDRINUSE') {
console.log( this.logger.error(
`n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`,
); );
process.exit(1); process.exit(1);
} }
}); });
} }
} catch (error) {
await exitWithCrash('Worker process cannot continue.', error);
} }
})();
async catch(error: Error) {
await this.exitWithCrash('Worker exiting due to an error.', error);
} }
} }

View file

@ -25,6 +25,8 @@ export function getN8nPackageJson() {
return jsonParse<n8n.PackageJson>(readFileSync(join(CLI_DIR, 'package.json'), 'utf8')); return jsonParse<n8n.PackageJson>(readFileSync(join(CLI_DIR, 'package.json'), 'utf8'));
} }
export const START_NODES = ['n8n-nodes-base.start', 'n8n-nodes-base.manualTrigger'];
export const N8N_VERSION = getN8nPackageJson().version; export const N8N_VERSION = getN8nPackageJson().version;
export const NODE_PACKAGE_PREFIX = 'n8n-nodes-'; export const NODE_PACKAGE_PREFIX = 'n8n-nodes-';

View file

@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { CliWorkflowOperationError, SubworkflowOperationError } from 'n8n-workflow'; import { CliWorkflowOperationError, SubworkflowOperationError } from 'n8n-workflow';
import type { INode } from 'n8n-workflow'; import type { INode } from 'n8n-workflow';
import { START_NODES } from './constants';
function findWorkflowStart(executionMode: 'integrated' | 'cli') { function findWorkflowStart(executionMode: 'integrated' | 'cli') {
return function (nodes: INode[]) { return function (nodes: INode[]) {
@ -10,7 +11,7 @@ function findWorkflowStart(executionMode: 'integrated' | 'cli') {
if (executeWorkflowTriggerNode) return executeWorkflowTriggerNode; if (executeWorkflowTriggerNode) return executeWorkflowTriggerNode;
const startNode = nodes.find((node) => node.type === 'n8n-nodes-base.start'); const startNode = nodes.find((node) => START_NODES.includes(node.type));
if (startNode) return startNode; if (startNode) return startNode;

View file

@ -3,8 +3,9 @@ import type { ILogger, LogTypes } from './Interfaces';
let logger: ILogger | undefined; let logger: ILogger | undefined;
export function init(loggerInstance: ILogger) { export function init<L extends ILogger>(loggerInstance: L) {
logger = loggerInstance; logger = loggerInstance;
return loggerInstance;
} }
export function getInstance(): ILogger { export function getInstance(): ILogger {