Add bull to execute workflows

This commit is contained in:
Jan Oberhauser 2020-12-31 09:15:46 +01:00
parent b0e17453f4
commit 0251996b2a
9 changed files with 384 additions and 4 deletions

View file

@ -0,0 +1,190 @@
import * as PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command';
import {
UserSettings,
WorkflowExecute,
} from 'n8n-core';
import {
INodeTypes,
IRun,
IWorkflowExecuteHooks,
Workflow,
WorkflowHooks,
} from 'n8n-workflow';
import {
CredentialsOverwrites,
CredentialTypes,
Db,
ExternalHooks,
GenericHelpers,
IBullJobData,
IBullJobResponse,
LoadNodesAndCredentials,
NodeTypes,
WorkflowCredentials,
WorkflowExecuteAdditionalData,
} from "../src";
import * as config from '../config';
import * as Bull from 'bull';
export class Worker extends Command {
static description = '\nStarts a n8n worker';
static examples = [
`$ n8n worker --concurrency=5`,
];
static flags = {
help: flags.help({ char: 'h' }),
concurrency: flags.integer({
default: 10,
description: 'How many jobs can run in parallel.',
}),
};
runningJobs: {
[key: string]: PCancelable<IRun>;
} = {};
getWorkflowHooks(jobData: IBullJobData, executionId: string): WorkflowHooks {
const hookFunctions: IWorkflowExecuteHooks = {};
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, jobData.executionMode, executionId, jobData.workflowData, { retryOf: jobData.retryOf as string });
}
async runJob(job: Bull.Job, nodeTypes: INodeTypes): Promise<IBullJobResponse> {
const jobData = job.data as IBullJobData;
console.log(`Start job: ${job.id} (Workflow ID: ${jobData.workflowData.id})`);
// TODO: Can in the future query most of that data from the DB to lighten redis load
const workflow = new Workflow({ id: jobData.workflowData.id as string, name: jobData.workflowData.name, nodes: jobData.workflowData!.nodes, connections: jobData.workflowData!.connections, active: jobData.workflowData!.active, nodeTypes, staticData: jobData.workflowData!.staticData, settings: jobData.workflowData!.settings });
const credentials = await WorkflowCredentials(jobData.workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksIntegrated(jobData.executionMode, job.data.executionId, jobData.workflowData, { retryOf: jobData.retryOf as string });
let workflowExecute: WorkflowExecute;
let workflowRun: PCancelable<IRun>;
if (jobData.executionData !== undefined) {
workflowExecute = new WorkflowExecute(additionalData, jobData.executionMode, jobData.executionData);
workflowRun = workflowExecute.processRunExecutionData(workflow);
} else if (jobData.runData === undefined || jobData.startNodes === undefined || jobData.startNodes.length === 0 || jobData.destinationNode === undefined) {
// Execute all nodes
// Can execute without webhook so go on
workflowExecute = new WorkflowExecute(additionalData, jobData.executionMode);
workflowRun = workflowExecute.run(workflow, undefined, jobData.destinationNode);
} else {
// Execute only the nodes between start and destination nodes
workflowExecute = new WorkflowExecute(additionalData, jobData.executionMode);
workflowRun = workflowExecute.runPartialWorkflow(workflow, jobData.runData, jobData.startNodes, jobData.destinationNode);
}
this.runningJobs[job.id] = workflowRun;
// Wait till the execution is finished
const runData = await workflowRun;
delete this.runningJobs[job.id];
return {
runData,
};
}
async run() {
console.log('Starting n8n worker...');
try {
const { flags } = this.parse(Worker);
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch(error => {
console.error(`There was an error initializing DB: ${error.message}`);
// @ts-ignore
process.emit('SIGINT');
});
// Make sure the settings exist
await UserSettings.prepareUserSettings();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Load the credentials overwrites if any exist
const credentialsOverwrites = CredentialsOverwrites();
await credentialsOverwrites.init();
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes();
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
const credentialTypes = CredentialTypes();
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
// Wait till the database is ready
await startDbInitPromise;
// Connect to bull-queue
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
const jobQueue = new Bull('jobs', { prefix, redis: redisOptions });
jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes));
const versions = await GenericHelpers.getVersions();
console.log('\nn8n worker is now ready');
console.log(` * Version: ${versions.cli}`);
console.log(` * Concurrency: ${flags.concurrency}`);
console.log('');
jobQueue.on('global:progress', (jobId, progress) => {
// Progress of a job got updated which does get used
// to communicate that a job got canceled.
if (progress === -1) {
// Job has to get canceled
if (this.runningJobs[jobId] !== undefined) {
// Job is processed by current worker so cancel
this.runningJobs[jobId].cancel();
delete this.runningJobs[jobId];
}
}
});
} catch (e) {
// TODO: Do some testing here how to really display the error
// this.error(`There was an error: ${error.message}`);
console.error('\nGOT ERROR');
console.log('====================================');
console.error(e.message);
console.error(e.stack);
// @ts-ignore
process.emit('SIGINT');
}
}
}

View file

@ -159,6 +159,13 @@ const config = convict({
env: 'EXECUTIONS_PROCESS', env: 'EXECUTIONS_PROCESS',
}, },
mode: {
doc: 'If it should run executions directly or via queue',
format: ['regular', 'queue'],
default: 'regular',
env: 'EXECUTIONS_MODE',
},
// A Workflow times out and gets canceled after this time (seconds). // A Workflow times out and gets canceled after this time (seconds).
// If the workflow is executed in the main process a soft timeout // If the workflow is executed in the main process a soft timeout
// is executed (takes effect after the current node finishes). // is executed (takes effect after the current node finishes).
@ -239,6 +246,42 @@ const config = convict({
}, },
}, },
queue: {
bull: {
prefix: {
doc: 'Prefix for all queue keys',
format: String,
default: '',
env: 'QUEUE_BULL_PREFIX',
},
redis: {
db: {
doc: 'Redis DB',
format: Number,
default: 0,
env: 'QUEUE_BULL_REDIS_DB',
},
host: {
doc: 'Redis Host',
format: String,
default: 'localhost',
env: 'QUEUE_BULL_REDIS_HOST',
},
password: {
doc: 'Redis Password',
format: String,
default: '',
env: 'QUEUE_BULL_REDIS_PASSWORD',
},
port: {
doc: 'Redis Port',
format: Number,
default: 6379,
env: 'QUEUE_BULL_REDIS_PORT',
},
},
},
},
generic: { generic: {
// The timezone to use. Is important for nodes like "Cron" which start the // The timezone to use. Is important for nodes like "Cron" which start the
// workflow automatically at a specified time. This setting can also be // workflow automatically at a specified time. This setting can also be

View file

@ -56,6 +56,7 @@
"@oclif/dev-cli": "^1.22.2", "@oclif/dev-cli": "^1.22.2",
"@types/basic-auth": "^1.1.2", "@types/basic-auth": "^1.1.2",
"@types/bcryptjs": "^2.4.1", "@types/bcryptjs": "^2.4.1",
"@types/bull": "^3.3.10",
"@types/compression": "1.0.1", "@types/compression": "1.0.1",
"@types/connect-history-api-fallback": "^1.3.1", "@types/connect-history-api-fallback": "^1.3.1",
"@types/convict": "^4.2.1", "@types/convict": "^4.2.1",
@ -86,6 +87,7 @@
"bcryptjs": "^2.4.3", "bcryptjs": "^2.4.3",
"body-parser": "^1.18.3", "body-parser": "^1.18.3",
"body-parser-xml": "^1.1.0", "body-parser-xml": "^1.1.0",
"bull": "^3.19.0",
"client-oauth2": "^4.2.5", "client-oauth2": "^4.2.5",
"compression": "^1.7.4", "compression": "^1.7.4",
"connect-history-api-fallback": "^1.6.0", "connect-history-api-fallback": "^1.6.0",

View file

@ -33,6 +33,21 @@ export interface IActivationError {
}; };
} }
export interface IBullJobData {
destinationNode?: string;
executionId: string;
executionMode: WorkflowExecuteMode;
executionData?: IRunExecutionData;
runData?: IRunData;
retryOf?: number | string | ObjectID;
startNodes?: string[];
workflowData: IWorkflowBase;
}
export interface IBullJobResponse {
runData: IRun,
}
export interface ICustomRequest extends Request { export interface ICustomRequest extends Request {
parsedUrl: Url | undefined; parsedUrl: Url | undefined;
} }

View file

@ -283,7 +283,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
// Start now to run the workflow // Start now to run the workflow
const workflowRunner = new WorkflowRunner(); const workflowRunner = new WorkflowRunner();
const executionId = await workflowRunner.run(runData, true); const executionId = await workflowRunner.run(runData, true, !didSendResponse);
// Get a promise which resolves when the workflow did execute and send then response // Get a promise which resolves when the workflow did execute and send then response
const executePromise = activeExecutions.getPostExecutePromise(executionId) as Promise<IExecutionDb | undefined>; const executePromise = activeExecutions.getPostExecutePromise(executionId) as Promise<IExecutionDb | undefined>;

View file

@ -485,6 +485,26 @@ export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionI
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode); const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) { for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPush();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
} }
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
@ -503,15 +523,22 @@ export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, execut
const hookFunctions = hookFunctionsSave(); const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush(); const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) { for (const key of Object.keys(pushFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]); hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
} }
if (isMainProcess) { if (isMainProcess) {
const preExecuteFunctions = hookFunctionsPreExecute(); const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) { for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
} }
} }
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string}); return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string});
} }

View file

@ -1,8 +1,10 @@
import { import {
ActiveExecutions, ActiveExecutions,
IBullJobResponse,
CredentialsOverwrites, CredentialsOverwrites,
CredentialTypes, CredentialTypes,
ExternalHooks, ExternalHooks,
IBullJobData,
ICredentialsOverwrite, ICredentialsOverwrite,
ICredentialsTypeData, ICredentialsTypeData,
IProcessMessageDataHook, IProcessMessageDataHook,
@ -33,17 +35,28 @@ import * as PCancelable from 'p-cancelable';
import { join as pathJoin } from 'path'; import { join as pathJoin } from 'path';
import { fork } from 'child_process'; import { fork } from 'child_process';
import * as Bull from 'bull';
export class WorkflowRunner { export class WorkflowRunner {
activeExecutions: ActiveExecutions.ActiveExecutions; activeExecutions: ActiveExecutions.ActiveExecutions;
credentialsOverwrites: ICredentialsOverwrite; credentialsOverwrites: ICredentialsOverwrite;
push: Push.Push; push: Push.Push;
jobQueue: Bull.Queue;
constructor() { constructor() {
this.push = Push.getInstance(); this.push = Push.getInstance();
this.activeExecutions = ActiveExecutions.getInstance(); this.activeExecutions = ActiveExecutions.getInstance();
this.credentialsOverwrites = CredentialsOverwrites().getAll(); this.credentialsOverwrites = CredentialsOverwrites().getAll();
const executionsMode = config.get('executions.mode') as string;
if (executionsMode === 'queue') {
// Connect to bull-queue
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions });
}
} }
@ -99,11 +112,16 @@ export class WorkflowRunner {
* @returns {Promise<string>} * @returns {Promise<string>}
* @memberof WorkflowRunner * @memberof WorkflowRunner
*/ */
async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> { async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise<string> {
const executionsProcess = config.get('executions.process') as string; const executionsProcess = config.get('executions.process') as string;
const executionsMode = config.get('executions.mode') as string;
let executionId: string; let executionId: string;
if (executionsProcess === 'main') { if (executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
executionId = await this.runBull(data, loadStaticData, realtime);
} else if (executionsProcess === 'main') {
executionId = await this.runMainProcess(data, loadStaticData); executionId = await this.runMainProcess(data, loadStaticData);
} else { } else {
executionId = await this.runSubprocess(data, loadStaticData); executionId = await this.runSubprocess(data, loadStaticData);
@ -191,6 +209,88 @@ export class WorkflowRunner {
return executionId; return executionId;
} }
async runBull(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise<string> {
// TODO: If "loadStaticData" is set to true it has to load data new on worker
// Register the active execution
const executionId = this.activeExecutions.add(data, undefined);
const jobData: IBullJobData = {
destinationNode: data.destinationNode,
executionId,
executionMode: data.executionMode,
executionData: data.executionData,
retryOf: data.retryOf,
runData: data.runData,
startNodes: data.startNodes,
workflowData: data.workflowData,
};
let priority = 100;
if (realtime === true) {
// Jobs which require a direct response get a higher priority
priority = 50;
}
// TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.
// Check if they get retried by default and how often.
const jobOptions = {
priority,
removeOnComplete: true,
removeOnFail: true,
};
const job = await this.jobQueue.add(jobData, jobOptions);
console.log('Started with ID: ' + job.id.toString());
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined });
// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
hooks.executeHookFunctions('workflowExecuteBefore', []);
const workflowExecution: PCancelable<IRun> = new PCancelable(async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
if (await job.isActive()) {
// Job is already running so tell it to stop
await job.progress(-1);
} else {
// Job did not get started yet so remove from queue
await job.remove();
const fullRunData: IRun = {
data: {
resultData: {
error: {
message: 'Workflow has been canceled!',
} as IExecutionError,
runData: {},
}
},
mode: data.executionMode,
startedAt: new Date(),
stoppedAt: new Date(),
};
this.activeExecutions.remove(executionId, fullRunData);
resolve(fullRunData);
}
});
const jobData: IBullJobResponse = await job.finished();
this.activeExecutions.remove(executionId, jobData.runData);
// Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed.
hooks.executeHookFunctions('workflowExecuteAfter', [jobData.runData]);
resolve(jobData.runData);
});
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
/** /**
* Run the workflow * Run the workflow
* *

View file

@ -152,6 +152,9 @@ export class WorkflowRunnerProcess {
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute(); const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) { for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
} }

View file

@ -615,7 +615,7 @@ export class WorkflowExecute {
// be executed in the meantime // be executed in the meantime
await new Promise((resolve) => { await new Promise((resolve) => {
setTimeout(() => { setTimeout(() => {
resolve(); resolve(undefined);
}, waitBetweenTries); }, waitBetweenTries);
}); });
} }