From 95cb1b2788d33f9b534c6dacb6f94262a1c7e08d Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Fri, 17 Jan 2020 19:34:31 -0600 Subject: [PATCH] :sparkles: Add possibility to execute workflows in same process --- docs/configuration.md | 14 + packages/cli/config/index.ts | 6 + packages/cli/package.json | 1 + packages/cli/src/ActiveExecutions.ts | 40 ++- packages/cli/src/Interfaces.ts | 4 +- packages/cli/src/NodeTypes.ts | 2 +- packages/cli/src/WorkflowRunner.ts | 73 ++++- packages/core/package.json | 1 + packages/core/src/WorkflowExecute.ts | 421 ++++++++++++++------------- 9 files changed, 353 insertions(+), 209 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 65dd36f472..2bd789e2b3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -77,6 +77,20 @@ These settings can also be overwritten on a per workflow basis in the workflow settings in the Editor UI. +## Execute In Same Process + +All workflows get executed in their own separate process. This ensures that all CPU cores +get used and that they do not block each other on CPU intensive tasks. Additionally does +the crash of one execution not take down the whole application. The disadvantage is, however, +that it slows down the start-time considerably and uses much more memory. So in case, the +workflows are not CPU intensive and they have to start very fast it is possible to run them +all directly in the main-process with this setting. + +```bash +export EXECUTIONS_SAME_PROCESS=true +``` + + ## Exclude Nodes It is possible to not allow users to use nodes of a specific node type. If you, for example, diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index f8b776ec82..b0eae64bd6 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -84,6 +84,12 @@ const config = convict({ default: false, env: 'EXECUTIONS_DATA_SAVE_MANUAL_EXECUTIONS' }, + + sameProcessExecution: { + doc: 'Executes the workflows in the same process instead of in a separate one', + default: false, + env: 'EXECUTIONS_SAME_PROCESS' + }, }, generic: { diff --git a/packages/cli/package.json b/packages/cli/package.json index 8e58aee981..b9c913aeef 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -66,6 +66,7 @@ "@types/request-promise-native": "^1.0.15", "jest": "^24.9.0", "nodemon": "^2.0.2", + "p-cancelable": "^2.0.0", "run-script-os": "^1.0.7", "ts-jest": "^24.0.2", "tslint": "^5.17.0", diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index f3cfba16f2..10323946f4 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -13,6 +13,7 @@ import { } from '.'; import { ChildProcess } from 'child_process'; +import * as PCancelable from 'p-cancelable'; export class ActiveExecutions { @@ -30,7 +31,7 @@ export class ActiveExecutions { * @returns {string} * @memberof ActiveExecutions */ - add(process: ChildProcess, executionData: IWorkflowExecutionDataProcess): string { + add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): string { const executionId = this.nextId++; this.activeExecutions[executionId] = { @@ -44,6 +45,22 @@ export class ActiveExecutions { } + /** + * Attaches an execution + * + * @param {string} executionId + * @param {PCancelable} workflowExecution + * @memberof ActiveExecutions + */ + attachWorkflowExecution(executionId: string, workflowExecution: PCancelable) { + if (this.activeExecutions[executionId] === undefined) { + throw new Error(`No active execution with id "${executionId}" got found to attach to workflowExecution to!`); + } + + this.activeExecutions[executionId].workflowExecution = workflowExecution; + } + + /** * Remove an active execution * @@ -82,13 +99,20 @@ export class ActiveExecutions { // In case something goes wrong make sure that promise gets first // returned that it gets then also resolved correctly. - setTimeout(() => { - if (this.activeExecutions[executionId].process.connected) { - this.activeExecutions[executionId].process.send({ - type: 'stopExecution' - }); - } - }, 1); + if (this.activeExecutions[executionId].process !== undefined) { + // Workflow is running in subprocess + setTimeout(() => { + if (this.activeExecutions[executionId].process!.connected) { + this.activeExecutions[executionId].process!.send({ + type: 'stopExecution' + }); + } + + }, 1); + } else { + // Workflow is running in current process + this.activeExecutions[executionId].workflowExecution!.cancel('Canceled by user'); + } return this.getPostExecutePromise(executionId); } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 4000c30785..dbc39dccaf 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -17,6 +17,7 @@ import { } from 'n8n-core'; +import * as PCancelable from 'p-cancelable'; import { ObjectID, Repository } from 'typeorm'; import { ChildProcess } from 'child_process'; @@ -181,9 +182,10 @@ export interface IExecutionDeleteFilter { export interface IExecutingWorkflowData { executionData: IWorkflowExecutionDataProcess; - process: ChildProcess; + process?: ChildProcess; startedAt: Date; postExecutePromises: Array>; + workflowExecution?: PCancelable; } export interface IN8nConfig { diff --git a/packages/cli/src/NodeTypes.ts b/packages/cli/src/NodeTypes.ts index 66b2363bfe..f600e8b734 100644 --- a/packages/cli/src/NodeTypes.ts +++ b/packages/cli/src/NodeTypes.ts @@ -15,7 +15,7 @@ class NodeTypesClass implements INodeTypes { // Some nodeTypes need to get special parameters applied like the // polling nodes the polling times for (const nodeTypeData of Object.values(nodeTypes)) { - const applyParameters = NodeHelpers.getSpecialNodeParameters(nodeTypeData.type) + const applyParameters = NodeHelpers.getSpecialNodeParameters(nodeTypeData.type); if (applyParameters.length) { nodeTypeData.type.description.properties.unshift.apply(nodeTypeData.type.description.properties, applyParameters); diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 8840d1cfb0..fc00e33d30 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -4,6 +4,7 @@ import { ITransferNodeTypes, IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcessWithExecution, + NodeTypes, Push, WorkflowExecuteAdditionalData, WorkflowHelpers, @@ -11,15 +12,19 @@ import { import { IProcessMessage, + WorkflowExecute, } from 'n8n-core'; import { IExecutionError, IRun, + Workflow, WorkflowHooks, WorkflowExecuteMode, } from 'n8n-workflow'; +import * as config from '../config'; +import * as PCancelable from 'p-cancelable'; import { join as pathJoin } from 'path'; import { fork } from 'child_process'; @@ -80,7 +85,7 @@ export class WorkflowRunner { /** - * Run the workflow in subprocess + * Run the workflow * * @param {IWorkflowExecutionDataProcess} data * @param {boolean} [loadStaticData] If set will the static data be loaded from @@ -89,6 +94,70 @@ export class WorkflowRunner { * @memberof WorkflowRunner */ async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { + const sameProcessExecution = config.get('executions.sameProcessExecution') as boolean; + if (sameProcessExecution === true) { + return this.runSameProcess(data, loadStaticData); + } + + return this.runSubprocess(data, loadStaticData); + } + + + /** + * Run the workflow in current process + * + * @param {IWorkflowExecutionDataProcess} data + * @param {boolean} [loadStaticData] If set will the static data be loaded from + * the workflow and added to input data + * @returns {Promise} + * @memberof WorkflowRunner + */ + async runSameProcess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { + if (loadStaticData === true && data.workflowData.id) { + data.workflowData.staticData = await WorkflowHelpers.getStaticDataById(data.workflowData.id as string); + } + + const nodeTypes = NodeTypes(); + + const workflow = new Workflow(data.workflowData.id as string | undefined, data.workflowData!.nodes, data.workflowData!.connections, data.workflowData!.active, nodeTypes, data.workflowData!.staticData); + const additionalData = await WorkflowExecuteAdditionalData.getBase(data.credentials); + + // Register the active execution + const executionId = this.activeExecutions.add(data, undefined); + + additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); + + let workflowExecution: PCancelable; + if (data.executionData !== undefined) { + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode, data.executionData); + workflowExecution = workflowExecute.processRunExecutionData(workflow); + } else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) { + // Execute all nodes + + // Can execute without webhook so go on + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode); + } else { + // Execute only the nodes between start and destination nodes + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + workflowExecution = workflowExecute.runPartialWorkflow(workflow, data.runData, data.startNodes, data.destinationNode); + } + + this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); + + return executionId; + } + + /** + * Run the workflow + * + * @param {IWorkflowExecutionDataProcess} data + * @param {boolean} [loadStaticData] If set will the static data be loaded from + * the workflow and added to input data + * @returns {Promise} + * @memberof WorkflowRunner + */ + async runSubprocess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { const startedAt = new Date(); const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); @@ -97,7 +166,7 @@ export class WorkflowRunner { } // Register the active execution - const executionId = this.activeExecutions.add(subprocess, data); + const executionId = this.activeExecutions.add(data, subprocess); // Check if workflow contains a "executeWorkflow" Node as in this // case we can not know which nodeTypes will be needed and so have diff --git a/packages/core/package.json b/packages/core/package.json index 03f4cdf0fe..1c8c831e91 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -44,6 +44,7 @@ "lodash.get": "^4.4.2", "mmmagic": "^0.5.2", "n8n-workflow": "~0.20.0", + "p-cancelable": "^2.0.0", "request-promise-native": "^1.0.7" }, "jest": { diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index d8ca8cd50c..bff9ed6f4e 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -1,3 +1,5 @@ +import * as PCancelable from 'p-cancelable'; + import { IConnection, IDataObject, @@ -54,7 +56,7 @@ export class WorkflowExecute { * @returns {(Promise)} * @memberof WorkflowExecute */ - async run(workflow: Workflow, startNode?: INode, destinationNode?: string): Promise { + run(workflow: Workflow, startNode?: INode, destinationNode?: string): PCancelable { // Get the nodes to start workflow execution from startNode = startNode || workflow.getStartNode(destinationNode); @@ -115,7 +117,8 @@ export class WorkflowExecute { * @returns {(Promise)} * @memberof WorkflowExecute */ - async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise { + // @ts-ignore + async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): PCancelable { let incomingNodeConnections: INodeConnections | undefined; let connection: IConnection; @@ -209,7 +212,7 @@ export class WorkflowExecute { }, }; - return await this.processRunExecutionData(workflow); + return this.processRunExecutionData(workflow); } @@ -444,7 +447,7 @@ export class WorkflowExecute { * @returns {Promise} * @memberof WorkflowExecute */ - async processRunExecutionData(workflow: Workflow): Promise { + processRunExecutionData(workflow: Workflow): PCancelable { const startedAt = new Date(); const workflowIssues = workflow.checkReadyForExecution(); @@ -470,231 +473,255 @@ export class WorkflowExecute { let currentExecutionTry = ''; let lastExecutionTry = ''; - return (async () => { - executionLoop: - while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) { - nodeSuccessData = null; - executionError = undefined; - executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData; - executionNode = executionData.node; + return new PCancelable((resolve, reject, onCancel) => { + let gotCancel = false; - this.executeHook('nodeExecuteBefore', [executionNode.name]); + onCancel.shouldReject = false; + onCancel(() => { + console.log('got cancellled'); - // Get the index of the current run - runIndex = 0; - if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { - runIndex = this.runExecutionData.resultData.runData[executionNode.name].length; - } + gotCancel = true; + }); - currentExecutionTry = `${executionNode.name}:${runIndex}`; + const returnPromise = (async () => { - if (currentExecutionTry === lastExecutionTry) { - throw new Error('Did stop execution because execution seems to be in endless loop.'); - } + executionLoop: + while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) { - if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) { - // If filter is set and node is not on filter skip it, that avoids the problem that it executes - // leafs that are parallel to a selected destinationNode. Normally it would execute them because - // they have the same parent and it executes all child nodes. - continue; - } - - // Check if all the data which is needed to run the node is available - if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) { - // Check if the node has incoming connections - if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) { - let inputConnections: IConnection[][]; - let connectionIndex: number; - - inputConnections = workflow.connectionsByDestinationNode[executionNode.name]['main']; - - for (connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) { - if (workflow.getHighestNode(executionNode.name, 'main', connectionIndex).length === 0) { - // If there is no valid incoming node (if all are disabled) - // then ignore that it has inputs and simply execute it as it is without - // any data - continue; - } - - if (!executionData.data!.hasOwnProperty('main')) { - // ExecutionData does not even have the connection set up so can - // not have that data, so add it again to be executed later - this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); - lastExecutionTry = currentExecutionTry; - continue executionLoop; - } - - // Check if it has the data for all the inputs - // The most nodes just have one but merge node for example has two and data - // of both inputs has to be available to be able to process the node. - if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) { - // Does not have the data of the connections so add back to stack - this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); - lastExecutionTry = currentExecutionTry; - continue executionLoop; - } - } + // @ts-ignore + if (gotCancel === true) { + return Promise.resolve(); } - } - // Clone input data that nodes can not mess up data of parallel nodes which receive the same data - // TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned - // is very slow so only do if needed - startTime = new Date().getTime(); + nodeSuccessData = null; + executionError = undefined; + executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData; + executionNode = executionData.node; - let maxTries = 1; - if (executionData.node.retryOnFail === true) { - // TODO: Remove the hardcoded default-values here and also in NodeSettings.vue - maxTries = Math.min(5, Math.max(2, executionData.node.maxTries || 3)); - } + this.executeHook('nodeExecuteBefore', [executionNode.name]); - let waitBetweenTries = 0; - if (executionData.node.retryOnFail === true) { - // TODO: Remove the hardcoded default-values here and also in NodeSettings.vue - waitBetweenTries = Math.min(5000, Math.max(0, executionData.node.waitBetweenTries || 1000)); - } - - for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) { - try { - - if (tryIndex !== 0) { - // Reset executionError from previous error try - executionError = undefined; - if (waitBetweenTries !== 0) { - // TODO: Improve that in the future and check if other nodes can - // be executed in the meantime - await new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, waitBetweenTries); - }); - } - } - - this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name; - nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode); - - if (nodeSuccessData === null) { - // If null gets returned it means that the node did succeed - // but did not have any data. So the branch should end - // (meaning the nodes afterwards should not be processed) - continue executionLoop; - } - - break; - } catch (error) { - executionError = { - message: error.message, - stack: error.stack, - }; + // Get the index of the current run + runIndex = 0; + if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { + runIndex = this.runExecutionData.resultData.runData[executionNode.name].length; } - } - // Add the data to return to the user - // (currently does not get cloned as data does not get changed, maybe later we should do that?!?!) + currentExecutionTry = `${executionNode.name}:${runIndex}`; - if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { - this.runExecutionData.resultData.runData[executionNode.name] = []; - } - taskData = { - startTime, - executionTime: (new Date().getTime()) - startTime - }; - - if (executionError !== undefined) { - taskData.error = executionError; - - if (executionData.node.continueOnFail === true) { - // Workflow should continue running even if node errors - if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) { - // Simply get the input data of the node if it has any and pass it through - // to the next node - if (executionData.data.main[0] !== null) { - nodeSuccessData = [executionData.data.main[0] as INodeExecutionData[]]; - } - } - } else { - // Node execution did fail so add error and stop execution - this.runExecutionData.resultData.runData[executionNode.name].push(taskData); - - // Add the execution data again so that it can get restarted - this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); - - this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); - - break; + if (currentExecutionTry === lastExecutionTry) { + throw new Error('Did stop execution because execution seems to be in endless loop.'); } - } - // Node executed successfully. So add data and go on. - taskData.data = ({ - 'main': nodeSuccessData - } as ITaskDataConnections); + if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) { + // If filter is set and node is not on filter skip it, that avoids the problem that it executes + // leafs that are parallel to a selected destinationNode. Normally it would execute them because + // they have the same parent and it executes all child nodes. + continue; + } - this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); + // Check if all the data which is needed to run the node is available + if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) { + // Check if the node has incoming connections + if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) { + let inputConnections: IConnection[][]; + let connectionIndex: number; - this.runExecutionData.resultData.runData[executionNode.name].push(taskData); + inputConnections = workflow.connectionsByDestinationNode[executionNode.name]['main']; - if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) { - // If destination node is defined and got executed stop execution - continue; - } - - // Add the nodes to which the current node has an output connection to that they can - // be executed next - if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) { - if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) { - let outputIndex: string, connectionData: IConnection; - // Go over all the different - - // Add the nodes to be executed - for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) { - if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) { - continue; - } - - // Go through all the different outputs of this connection - for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) { - if (!workflow.nodes.hasOwnProperty(connectionData.node)) { - return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`)); + for (connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) { + if (workflow.getHighestNode(executionNode.name, 'main', connectionIndex).length === 0) { + // If there is no valid incoming node (if all are disabled) + // then ignore that it has inputs and simply execute it as it is without + // any data + continue; } - this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex); + if (!executionData.data!.hasOwnProperty('main')) { + // ExecutionData does not even have the connection set up so can + // not have that data, so add it again to be executed later + this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); + lastExecutionTry = currentExecutionTry; + continue executionLoop; + } + + // Check if it has the data for all the inputs + // The most nodes just have one but merge node for example has two and data + // of both inputs has to be available to be able to process the node. + if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) { + // Does not have the data of the connections so add back to stack + this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); + lastExecutionTry = currentExecutionTry; + continue executionLoop; + } + } + } + } + + // Clone input data that nodes can not mess up data of parallel nodes which receive the same data + // TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned + // is very slow so only do if needed + startTime = new Date().getTime(); + + let maxTries = 1; + if (executionData.node.retryOnFail === true) { + // TODO: Remove the hardcoded default-values here and also in NodeSettings.vue + maxTries = Math.min(5, Math.max(2, executionData.node.maxTries || 3)); + } + + let waitBetweenTries = 0; + if (executionData.node.retryOnFail === true) { + // TODO: Remove the hardcoded default-values here and also in NodeSettings.vue + waitBetweenTries = Math.min(5000, Math.max(0, executionData.node.waitBetweenTries || 1000)); + } + + for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) { + // @ts-ignore + if (gotCancel === true) { + return Promise.resolve(); + } + try { + + if (tryIndex !== 0) { + // Reset executionError from previous error try + executionError = undefined; + if (waitBetweenTries !== 0) { + // TODO: Improve that in the future and check if other nodes can + // be executed in the meantime + await new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, waitBetweenTries); + }); + } + } + + this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name; + nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode); + + if (nodeSuccessData === null) { + // If null gets returned it means that the node did succeed + // but did not have any data. So the branch should end + // (meaning the nodes afterwards should not be processed) + continue executionLoop; + } + + break; + } catch (error) { + executionError = { + message: error.message, + stack: error.stack, + }; + } + } + + // Add the data to return to the user + // (currently does not get cloned as data does not get changed, maybe later we should do that?!?!) + + if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { + this.runExecutionData.resultData.runData[executionNode.name] = []; + } + taskData = { + startTime, + executionTime: (new Date().getTime()) - startTime + }; + + if (executionError !== undefined) { + taskData.error = executionError; + + if (executionData.node.continueOnFail === true) { + // Workflow should continue running even if node errors + if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) { + // Simply get the input data of the node if it has any and pass it through + // to the next node + if (executionData.data.main[0] !== null) { + nodeSuccessData = [executionData.data.main[0] as INodeExecutionData[]]; + } + } + } else { + // Node execution did fail so add error and stop execution + this.runExecutionData.resultData.runData[executionNode.name].push(taskData); + + // Add the execution data again so that it can get restarted + this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); + + this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); + + break; + } + } + + // Node executed successfully. So add data and go on. + taskData.data = ({ + 'main': nodeSuccessData + } as ITaskDataConnections); + + this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); + + this.runExecutionData.resultData.runData[executionNode.name].push(taskData); + + if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) { + // If destination node is defined and got executed stop execution + continue; + } + + // Add the nodes to which the current node has an output connection to that they can + // be executed next + if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) { + if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) { + let outputIndex: string, connectionData: IConnection; + // Go over all the different + + // Add the nodes to be executed + for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) { + if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) { + continue; + } + + // Go through all the different outputs of this connection + for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) { + if (!workflow.nodes.hasOwnProperty(connectionData.node)) { + return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`)); + } + + this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex); + } } } } } - } - return Promise.resolve(); - })() - .then(async () => { - return this.processSuccessExecution(startedAt, workflow, executionError); - }) - .catch(async (error) => { - const fullRunData = this.getFullRunData(startedAt); + return Promise.resolve(); + })() + .then(async () => { + return this.processSuccessExecution(startedAt, workflow, executionError); + }) + .catch(async (error) => { + const fullRunData = this.getFullRunData(startedAt); - fullRunData.data.resultData.error = { - message: error.message, - stack: error.stack, - }; + fullRunData.data.resultData.error = { + message: error.message, + stack: error.stack, + }; - // Check if static data changed - let newStaticData: IDataObject | undefined; - if (workflow.staticData.__dataChanged === true) { - // Static data of workflow changed - newStaticData = workflow.staticData; - } + // Check if static data changed + let newStaticData: IDataObject | undefined; + if (workflow.staticData.__dataChanged === true) { + // Static data of workflow changed + newStaticData = workflow.staticData; + } - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); - return fullRunData; + return fullRunData; + }); + + return returnPromise.then(resolve); }); - } - async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: IExecutionError): Promise { + // @ts-ignore + async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: IExecutionError): PCancelable { const fullRunData = this.getFullRunData(startedAt); if (executionError !== undefined) {