From 0251996b2ad277e6d88a7cdd00e242b830ca93d9 Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Thu, 31 Dec 2020 09:15:46 +0100 Subject: [PATCH] :sparkles: Add bull to execute workflows --- packages/cli/commands/worker.ts | 190 ++++++++++++++++++ packages/cli/config/index.ts | 43 ++++ packages/cli/package.json | 2 + packages/cli/src/Interfaces.ts | 15 ++ packages/cli/src/WebhookHelpers.ts | 2 +- .../cli/src/WorkflowExecuteAdditionalData.ts | 27 +++ packages/cli/src/WorkflowRunner.ts | 104 +++++++++- packages/cli/src/WorkflowRunnerProcess.ts | 3 + packages/core/src/WorkflowExecute.ts | 2 +- 9 files changed, 384 insertions(+), 4 deletions(-) create mode 100644 packages/cli/commands/worker.ts diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts new file mode 100644 index 0000000000..bcc9db18ff --- /dev/null +++ b/packages/cli/commands/worker.ts @@ -0,0 +1,190 @@ +import * as PCancelable from 'p-cancelable'; + +import { Command, flags } from '@oclif/command'; +import { + UserSettings, + WorkflowExecute, +} from 'n8n-core'; + +import { + INodeTypes, + IRun, + IWorkflowExecuteHooks, + Workflow, + WorkflowHooks, +} from 'n8n-workflow'; + +import { + CredentialsOverwrites, + CredentialTypes, + Db, + ExternalHooks, + GenericHelpers, + IBullJobData, + IBullJobResponse, + LoadNodesAndCredentials, + NodeTypes, + WorkflowCredentials, + WorkflowExecuteAdditionalData, +} from "../src"; + +import * as config from '../config'; +import * as Bull from 'bull'; + +export class Worker extends Command { + static description = '\nStarts a n8n worker'; + + static examples = [ + `$ n8n worker --concurrency=5`, + ]; + + static flags = { + help: flags.help({ char: 'h' }), + concurrency: flags.integer({ + default: 10, + description: 'How many jobs can run in parallel.', + }), + }; + + runningJobs: { + [key: string]: PCancelable; + } = {}; + + + getWorkflowHooks(jobData: IBullJobData, executionId: string): WorkflowHooks { + const hookFunctions: IWorkflowExecuteHooks = {}; + + const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute(); + for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } + hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + + return new WorkflowHooks(hookFunctions, jobData.executionMode, executionId, jobData.workflowData, { retryOf: jobData.retryOf as string }); + } + + + async runJob(job: Bull.Job, nodeTypes: INodeTypes): Promise { + const jobData = job.data as IBullJobData; + + console.log(`Start job: ${job.id} (Workflow ID: ${jobData.workflowData.id})`); + // TODO: Can in the future query most of that data from the DB to lighten redis load + + + const workflow = new Workflow({ id: jobData.workflowData.id as string, name: jobData.workflowData.name, nodes: jobData.workflowData!.nodes, connections: jobData.workflowData!.connections, active: jobData.workflowData!.active, nodeTypes, staticData: jobData.workflowData!.staticData, settings: jobData.workflowData!.settings }); + + const credentials = await WorkflowCredentials(jobData.workflowData.nodes); + + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); + additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksIntegrated(jobData.executionMode, job.data.executionId, jobData.workflowData, { retryOf: jobData.retryOf as string }); + + let workflowExecute: WorkflowExecute; + let workflowRun: PCancelable; + if (jobData.executionData !== undefined) { + workflowExecute = new WorkflowExecute(additionalData, jobData.executionMode, jobData.executionData); + workflowRun = workflowExecute.processRunExecutionData(workflow); + } else if (jobData.runData === undefined || jobData.startNodes === undefined || jobData.startNodes.length === 0 || jobData.destinationNode === undefined) { + // Execute all nodes + + // Can execute without webhook so go on + workflowExecute = new WorkflowExecute(additionalData, jobData.executionMode); + workflowRun = workflowExecute.run(workflow, undefined, jobData.destinationNode); + } else { + // Execute only the nodes between start and destination nodes + workflowExecute = new WorkflowExecute(additionalData, jobData.executionMode); + workflowRun = workflowExecute.runPartialWorkflow(workflow, jobData.runData, jobData.startNodes, jobData.destinationNode); + } + + this.runningJobs[job.id] = workflowRun; + + // Wait till the execution is finished + const runData = await workflowRun; + + delete this.runningJobs[job.id]; + + return { + runData, + }; + } + + async run() { + console.log('Starting n8n worker...'); + + try { + const { flags } = this.parse(Worker); + + // Start directly with the init of the database to improve startup time + const startDbInitPromise = Db.init().catch(error => { + console.error(`There was an error initializing DB: ${error.message}`); + + // @ts-ignore + process.emit('SIGINT'); + }); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); + + // Load all node and credential types + const loadNodesAndCredentials = LoadNodesAndCredentials(); + await loadNodesAndCredentials.init(); + + // Load the credentials overwrites if any exist + const credentialsOverwrites = CredentialsOverwrites(); + await credentialsOverwrites.init(); + + // Load all external hooks + const externalHooks = ExternalHooks(); + await externalHooks.init(); + + // Add the found types to an instance other parts of the application can use + const nodeTypes = NodeTypes(); + await nodeTypes.init(loadNodesAndCredentials.nodeTypes); + const credentialTypes = CredentialTypes(); + await credentialTypes.init(loadNodesAndCredentials.credentialTypes); + + // Wait till the database is ready + await startDbInitPromise; + + // Connect to bull-queue + const prefix = config.get('queue.bull.prefix') as string; + const redisOptions = config.get('queue.bull.redis') as object; + const jobQueue = new Bull('jobs', { prefix, redis: redisOptions }); + jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes)); + + const versions = await GenericHelpers.getVersions(); + + console.log('\nn8n worker is now ready'); + console.log(` * Version: ${versions.cli}`); + console.log(` * Concurrency: ${flags.concurrency}`); + console.log(''); + + jobQueue.on('global:progress', (jobId, progress) => { + // Progress of a job got updated which does get used + // to communicate that a job got canceled. + + if (progress === -1) { + // Job has to get canceled + if (this.runningJobs[jobId] !== undefined) { + // Job is processed by current worker so cancel + this.runningJobs[jobId].cancel(); + delete this.runningJobs[jobId]; + } + } + }); + + + } catch (e) { + // TODO: Do some testing here how to really display the error + // this.error(`There was an error: ${error.message}`); + console.error('\nGOT ERROR'); + console.log('===================================='); + console.error(e.message); + console.error(e.stack); + + // @ts-ignore + process.emit('SIGINT'); + } + } +} diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index 5cbacddf85..19a8c5525e 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -159,6 +159,13 @@ const config = convict({ env: 'EXECUTIONS_PROCESS', }, + mode: { + doc: 'If it should run executions directly or via queue', + format: ['regular', 'queue'], + default: 'regular', + env: 'EXECUTIONS_MODE', + }, + // A Workflow times out and gets canceled after this time (seconds). // If the workflow is executed in the main process a soft timeout // is executed (takes effect after the current node finishes). @@ -239,6 +246,42 @@ const config = convict({ }, }, + queue: { + bull: { + prefix: { + doc: 'Prefix for all queue keys', + format: String, + default: '', + env: 'QUEUE_BULL_PREFIX', + }, + redis: { + db: { + doc: 'Redis DB', + format: Number, + default: 0, + env: 'QUEUE_BULL_REDIS_DB', + }, + host: { + doc: 'Redis Host', + format: String, + default: 'localhost', + env: 'QUEUE_BULL_REDIS_HOST', + }, + password: { + doc: 'Redis Password', + format: String, + default: '', + env: 'QUEUE_BULL_REDIS_PASSWORD', + }, + port: { + doc: 'Redis Port', + format: Number, + default: 6379, + env: 'QUEUE_BULL_REDIS_PORT', + }, + }, + }, + }, generic: { // The timezone to use. Is important for nodes like "Cron" which start the // workflow automatically at a specified time. This setting can also be diff --git a/packages/cli/package.json b/packages/cli/package.json index ac7c6ce5a3..e0c7d05497 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -56,6 +56,7 @@ "@oclif/dev-cli": "^1.22.2", "@types/basic-auth": "^1.1.2", "@types/bcryptjs": "^2.4.1", + "@types/bull": "^3.3.10", "@types/compression": "1.0.1", "@types/connect-history-api-fallback": "^1.3.1", "@types/convict": "^4.2.1", @@ -86,6 +87,7 @@ "bcryptjs": "^2.4.3", "body-parser": "^1.18.3", "body-parser-xml": "^1.1.0", + "bull": "^3.19.0", "client-oauth2": "^4.2.5", "compression": "^1.7.4", "connect-history-api-fallback": "^1.6.0", diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index b808a73564..74bc5bc8d9 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -33,6 +33,21 @@ export interface IActivationError { }; } +export interface IBullJobData { + destinationNode?: string; + executionId: string; + executionMode: WorkflowExecuteMode; + executionData?: IRunExecutionData; + runData?: IRunData; + retryOf?: number | string | ObjectID; + startNodes?: string[]; + workflowData: IWorkflowBase; +} + +export interface IBullJobResponse { + runData: IRun, +} + export interface ICustomRequest extends Request { parsedUrl: Url | undefined; } diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 6d3504963a..f77613b29b 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -283,7 +283,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { // Start now to run the workflow const workflowRunner = new WorkflowRunner(); - const executionId = await workflowRunner.run(runData, true); + const executionId = await workflowRunner.run(runData, true, !didSendResponse); // Get a promise which resolves when the workflow did execute and send then response const executePromise = activeExecutions.getPostExecutePromise(executionId) as Promise; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 592b13ca0f..3820ec12b5 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -485,6 +485,26 @@ export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionI const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode); const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } + hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); +} + + +/** + * Returns WorkflowHooks instance for main process if workflow runs via worker + */ +export function getWorkflowHooksWorkerMain(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks { + optionalParameters = optionalParameters || {}; + const hookFunctions = hookFunctionsPush(); + const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); + for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); } return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); @@ -503,15 +523,22 @@ export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, execut const hookFunctions = hookFunctionsSave(); const pushFunctions = hookFunctionsPush(); for (const key of Object.keys(pushFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]); } if (isMainProcess) { const preExecuteFunctions = hookFunctionsPreExecute(); for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); } } return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string}); } + diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 3306282a4e..af865a16d7 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -1,8 +1,10 @@ import { ActiveExecutions, + IBullJobResponse, CredentialsOverwrites, CredentialTypes, ExternalHooks, + IBullJobData, ICredentialsOverwrite, ICredentialsTypeData, IProcessMessageDataHook, @@ -33,17 +35,28 @@ import * as PCancelable from 'p-cancelable'; import { join as pathJoin } from 'path'; import { fork } from 'child_process'; +import * as Bull from 'bull'; export class WorkflowRunner { activeExecutions: ActiveExecutions.ActiveExecutions; credentialsOverwrites: ICredentialsOverwrite; push: Push.Push; + jobQueue: Bull.Queue; constructor() { this.push = Push.getInstance(); this.activeExecutions = ActiveExecutions.getInstance(); this.credentialsOverwrites = CredentialsOverwrites().getAll(); + + const executionsMode = config.get('executions.mode') as string; + + if (executionsMode === 'queue') { + // Connect to bull-queue + const prefix = config.get('queue.bull.prefix') as string; + const redisOptions = config.get('queue.bull.redis') as object; + this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions }); + } } @@ -99,11 +112,16 @@ export class WorkflowRunner { * @returns {Promise} * @memberof WorkflowRunner */ - async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { + async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise { const executionsProcess = config.get('executions.process') as string; + const executionsMode = config.get('executions.mode') as string; let executionId: string; - if (executionsProcess === 'main') { + if (executionsMode === 'queue' && data.executionMode !== 'manual') { + // Do not run "manual" executions in bull because sending events to the + // frontend would not be possible + executionId = await this.runBull(data, loadStaticData, realtime); + } else if (executionsProcess === 'main') { executionId = await this.runMainProcess(data, loadStaticData); } else { executionId = await this.runSubprocess(data, loadStaticData); @@ -191,6 +209,88 @@ export class WorkflowRunner { return executionId; } + async runBull(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise { + + // TODO: If "loadStaticData" is set to true it has to load data new on worker + + // Register the active execution + const executionId = this.activeExecutions.add(data, undefined); + + const jobData: IBullJobData = { + destinationNode: data.destinationNode, + executionId, + executionMode: data.executionMode, + executionData: data.executionData, + retryOf: data.retryOf, + runData: data.runData, + startNodes: data.startNodes, + workflowData: data.workflowData, + }; + + let priority = 100; + if (realtime === true) { + // Jobs which require a direct response get a higher priority + priority = 50; + } + // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds. + // Check if they get retried by default and how often. + const jobOptions = { + priority, + removeOnComplete: true, + removeOnFail: true, + }; + const job = await this.jobQueue.add(jobData, jobOptions); + console.log('Started with ID: ' + job.id.toString()); + + const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined }); + + // Normally also workflow should be supplied here but as it only used for sending + // data to editor-UI is not needed. + hooks.executeHookFunctions('workflowExecuteBefore', []); + + const workflowExecution: PCancelable = new PCancelable(async (resolve, reject, onCancel) => { + onCancel.shouldReject = false; + onCancel(async () => { + if (await job.isActive()) { + // Job is already running so tell it to stop + await job.progress(-1); + } else { + // Job did not get started yet so remove from queue + await job.remove(); + + const fullRunData: IRun = { + data: { + resultData: { + error: { + message: 'Workflow has been canceled!', + } as IExecutionError, + runData: {}, + } + }, + mode: data.executionMode, + startedAt: new Date(), + stoppedAt: new Date(), + }; + + this.activeExecutions.remove(executionId, fullRunData); + resolve(fullRunData); + } + }); + + const jobData: IBullJobResponse = await job.finished(); + this.activeExecutions.remove(executionId, jobData.runData); + // Normally also static data should be supplied here but as it only used for sending + // data to editor-UI is not needed. + hooks.executeHookFunctions('workflowExecuteAfter', [jobData.runData]); + + resolve(jobData.runData); + }); + + this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); + return executionId; + } + + /** * Run the workflow * diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 8fc749b4b8..dee857fd08 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -152,6 +152,9 @@ export class WorkflowRunnerProcess { const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute(); for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); } diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index c08ea2a020..36c13e60b9 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -615,7 +615,7 @@ export class WorkflowExecute { // be executed in the meantime await new Promise((resolve) => { setTimeout(() => { - resolve(); + resolve(undefined); }, waitBetweenTries); }); }