Add possibility to execute workflows in same process

This commit is contained in:
Jan Oberhauser 2020-01-17 19:34:31 -06:00
parent 3fe47ab89e
commit 95cb1b2788
9 changed files with 353 additions and 209 deletions

View file

@ -77,6 +77,20 @@ These settings can also be overwritten on a per workflow basis in the workflow
settings in the Editor UI. settings in the Editor UI.
## Execute In Same Process
All workflows get executed in their own separate process. This ensures that all CPU cores
get used and that they do not block each other on CPU intensive tasks. Additionally does
the crash of one execution not take down the whole application. The disadvantage is, however,
that it slows down the start-time considerably and uses much more memory. So in case, the
workflows are not CPU intensive and they have to start very fast it is possible to run them
all directly in the main-process with this setting.
```bash
export EXECUTIONS_SAME_PROCESS=true
```
## Exclude Nodes ## Exclude Nodes
It is possible to not allow users to use nodes of a specific node type. If you, for example, It is possible to not allow users to use nodes of a specific node type. If you, for example,

View file

@ -84,6 +84,12 @@ const config = convict({
default: false, default: false,
env: 'EXECUTIONS_DATA_SAVE_MANUAL_EXECUTIONS' env: 'EXECUTIONS_DATA_SAVE_MANUAL_EXECUTIONS'
}, },
sameProcessExecution: {
doc: 'Executes the workflows in the same process instead of in a separate one',
default: false,
env: 'EXECUTIONS_SAME_PROCESS'
},
}, },
generic: { generic: {

View file

@ -66,6 +66,7 @@
"@types/request-promise-native": "^1.0.15", "@types/request-promise-native": "^1.0.15",
"jest": "^24.9.0", "jest": "^24.9.0",
"nodemon": "^2.0.2", "nodemon": "^2.0.2",
"p-cancelable": "^2.0.0",
"run-script-os": "^1.0.7", "run-script-os": "^1.0.7",
"ts-jest": "^24.0.2", "ts-jest": "^24.0.2",
"tslint": "^5.17.0", "tslint": "^5.17.0",

View file

@ -13,6 +13,7 @@ import {
} from '.'; } from '.';
import { ChildProcess } from 'child_process'; import { ChildProcess } from 'child_process';
import * as PCancelable from 'p-cancelable';
export class ActiveExecutions { export class ActiveExecutions {
@ -30,7 +31,7 @@ export class ActiveExecutions {
* @returns {string} * @returns {string}
* @memberof ActiveExecutions * @memberof ActiveExecutions
*/ */
add(process: ChildProcess, executionData: IWorkflowExecutionDataProcess): string { add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): string {
const executionId = this.nextId++; const executionId = this.nextId++;
this.activeExecutions[executionId] = { this.activeExecutions[executionId] = {
@ -44,6 +45,22 @@ export class ActiveExecutions {
} }
/**
* Attaches an execution
*
* @param {string} executionId
* @param {PCancelable<IRun>} workflowExecution
* @memberof ActiveExecutions
*/
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
if (this.activeExecutions[executionId] === undefined) {
throw new Error(`No active execution with id "${executionId}" got found to attach to workflowExecution to!`);
}
this.activeExecutions[executionId].workflowExecution = workflowExecution;
}
/** /**
* Remove an active execution * Remove an active execution
* *
@ -82,13 +99,20 @@ export class ActiveExecutions {
// In case something goes wrong make sure that promise gets first // In case something goes wrong make sure that promise gets first
// returned that it gets then also resolved correctly. // returned that it gets then also resolved correctly.
setTimeout(() => { if (this.activeExecutions[executionId].process !== undefined) {
if (this.activeExecutions[executionId].process.connected) { // Workflow is running in subprocess
this.activeExecutions[executionId].process.send({ setTimeout(() => {
type: 'stopExecution' if (this.activeExecutions[executionId].process!.connected) {
}); this.activeExecutions[executionId].process!.send({
} type: 'stopExecution'
}, 1); });
}
}, 1);
} else {
// Workflow is running in current process
this.activeExecutions[executionId].workflowExecution!.cancel('Canceled by user');
}
return this.getPostExecutePromise(executionId); return this.getPostExecutePromise(executionId);
} }

View file

@ -17,6 +17,7 @@ import {
} from 'n8n-core'; } from 'n8n-core';
import * as PCancelable from 'p-cancelable';
import { ObjectID, Repository } from 'typeorm'; import { ObjectID, Repository } from 'typeorm';
import { ChildProcess } from 'child_process'; import { ChildProcess } from 'child_process';
@ -181,9 +182,10 @@ export interface IExecutionDeleteFilter {
export interface IExecutingWorkflowData { export interface IExecutingWorkflowData {
executionData: IWorkflowExecutionDataProcess; executionData: IWorkflowExecutionDataProcess;
process: ChildProcess; process?: ChildProcess;
startedAt: Date; startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>; postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
workflowExecution?: PCancelable<IRun>;
} }
export interface IN8nConfig { export interface IN8nConfig {

View file

@ -15,7 +15,7 @@ class NodeTypesClass implements INodeTypes {
// Some nodeTypes need to get special parameters applied like the // Some nodeTypes need to get special parameters applied like the
// polling nodes the polling times // polling nodes the polling times
for (const nodeTypeData of Object.values(nodeTypes)) { for (const nodeTypeData of Object.values(nodeTypes)) {
const applyParameters = NodeHelpers.getSpecialNodeParameters(nodeTypeData.type) const applyParameters = NodeHelpers.getSpecialNodeParameters(nodeTypeData.type);
if (applyParameters.length) { if (applyParameters.length) {
nodeTypeData.type.description.properties.unshift.apply(nodeTypeData.type.description.properties, applyParameters); nodeTypeData.type.description.properties.unshift.apply(nodeTypeData.type.description.properties, applyParameters);

View file

@ -4,6 +4,7 @@ import {
ITransferNodeTypes, ITransferNodeTypes,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
IWorkflowExecutionDataProcessWithExecution, IWorkflowExecutionDataProcessWithExecution,
NodeTypes,
Push, Push,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers, WorkflowHelpers,
@ -11,15 +12,19 @@ import {
import { import {
IProcessMessage, IProcessMessage,
WorkflowExecute,
} from 'n8n-core'; } from 'n8n-core';
import { import {
IExecutionError, IExecutionError,
IRun, IRun,
Workflow,
WorkflowHooks, WorkflowHooks,
WorkflowExecuteMode, WorkflowExecuteMode,
} from 'n8n-workflow'; } from 'n8n-workflow';
import * as config from '../config';
import * as PCancelable from 'p-cancelable';
import { join as pathJoin } from 'path'; import { join as pathJoin } from 'path';
import { fork } from 'child_process'; import { fork } from 'child_process';
@ -80,7 +85,7 @@ export class WorkflowRunner {
/** /**
* Run the workflow in subprocess * Run the workflow
* *
* @param {IWorkflowExecutionDataProcess} data * @param {IWorkflowExecutionDataProcess} data
* @param {boolean} [loadStaticData] If set will the static data be loaded from * @param {boolean} [loadStaticData] If set will the static data be loaded from
@ -89,6 +94,70 @@ export class WorkflowRunner {
* @memberof WorkflowRunner * @memberof WorkflowRunner
*/ */
async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> { async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> {
const sameProcessExecution = config.get('executions.sameProcessExecution') as boolean;
if (sameProcessExecution === true) {
return this.runSameProcess(data, loadStaticData);
}
return this.runSubprocess(data, loadStaticData);
}
/**
* Run the workflow in current process
*
* @param {IWorkflowExecutionDataProcess} data
* @param {boolean} [loadStaticData] If set will the static data be loaded from
* the workflow and added to input data
* @returns {Promise<string>}
* @memberof WorkflowRunner
*/
async runSameProcess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> {
if (loadStaticData === true && data.workflowData.id) {
data.workflowData.staticData = await WorkflowHelpers.getStaticDataById(data.workflowData.id as string);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(data.workflowData.id as string | undefined, data.workflowData!.nodes, data.workflowData!.connections, data.workflowData!.active, nodeTypes, data.workflowData!.staticData);
const additionalData = await WorkflowExecuteAdditionalData.getBase(data.credentials);
// Register the active execution
const executionId = this.activeExecutions.add(data, undefined);
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
let workflowExecution: PCancelable<IRun>;
if (data.executionData !== undefined) {
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode, data.executionData);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
// Execute all nodes
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
} else {
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.runPartialWorkflow(workflow, data.runData, data.startNodes, data.destinationNode);
}
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
/**
* Run the workflow
*
* @param {IWorkflowExecutionDataProcess} data
* @param {boolean} [loadStaticData] If set will the static data be loaded from
* the workflow and added to input data
* @returns {Promise<string>}
* @memberof WorkflowRunner
*/
async runSubprocess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> {
const startedAt = new Date(); const startedAt = new Date();
const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js'));
@ -97,7 +166,7 @@ export class WorkflowRunner {
} }
// Register the active execution // Register the active execution
const executionId = this.activeExecutions.add(subprocess, data); const executionId = this.activeExecutions.add(data, subprocess);
// Check if workflow contains a "executeWorkflow" Node as in this // Check if workflow contains a "executeWorkflow" Node as in this
// case we can not know which nodeTypes will be needed and so have // case we can not know which nodeTypes will be needed and so have

View file

@ -44,6 +44,7 @@
"lodash.get": "^4.4.2", "lodash.get": "^4.4.2",
"mmmagic": "^0.5.2", "mmmagic": "^0.5.2",
"n8n-workflow": "~0.20.0", "n8n-workflow": "~0.20.0",
"p-cancelable": "^2.0.0",
"request-promise-native": "^1.0.7" "request-promise-native": "^1.0.7"
}, },
"jest": { "jest": {

View file

@ -1,3 +1,5 @@
import * as PCancelable from 'p-cancelable';
import { import {
IConnection, IConnection,
IDataObject, IDataObject,
@ -54,7 +56,7 @@ export class WorkflowExecute {
* @returns {(Promise<string>)} * @returns {(Promise<string>)}
* @memberof WorkflowExecute * @memberof WorkflowExecute
*/ */
async run(workflow: Workflow, startNode?: INode, destinationNode?: string): Promise<IRun> { run(workflow: Workflow, startNode?: INode, destinationNode?: string): PCancelable<IRun> {
// Get the nodes to start workflow execution from // Get the nodes to start workflow execution from
startNode = startNode || workflow.getStartNode(destinationNode); startNode = startNode || workflow.getStartNode(destinationNode);
@ -115,7 +117,8 @@ export class WorkflowExecute {
* @returns {(Promise<string>)} * @returns {(Promise<string>)}
* @memberof WorkflowExecute * @memberof WorkflowExecute
*/ */
async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise<IRun> { // @ts-ignore
async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): PCancelable<IRun> {
let incomingNodeConnections: INodeConnections | undefined; let incomingNodeConnections: INodeConnections | undefined;
let connection: IConnection; let connection: IConnection;
@ -209,7 +212,7 @@ export class WorkflowExecute {
}, },
}; };
return await this.processRunExecutionData(workflow); return this.processRunExecutionData(workflow);
} }
@ -444,7 +447,7 @@ export class WorkflowExecute {
* @returns {Promise<string>} * @returns {Promise<string>}
* @memberof WorkflowExecute * @memberof WorkflowExecute
*/ */
async processRunExecutionData(workflow: Workflow): Promise<IRun> { processRunExecutionData(workflow: Workflow): PCancelable<IRun> {
const startedAt = new Date(); const startedAt = new Date();
const workflowIssues = workflow.checkReadyForExecution(); const workflowIssues = workflow.checkReadyForExecution();
@ -470,231 +473,255 @@ export class WorkflowExecute {
let currentExecutionTry = ''; let currentExecutionTry = '';
let lastExecutionTry = ''; let lastExecutionTry = '';
return (async () => { return new PCancelable((resolve, reject, onCancel) => {
executionLoop: let gotCancel = false;
while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
nodeSuccessData = null;
executionError = undefined;
executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
executionNode = executionData.node;
this.executeHook('nodeExecuteBefore', [executionNode.name]); onCancel.shouldReject = false;
onCancel(() => {
console.log('got cancellled');
// Get the index of the current run gotCancel = true;
runIndex = 0; });
if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
runIndex = this.runExecutionData.resultData.runData[executionNode.name].length;
}
currentExecutionTry = `${executionNode.name}:${runIndex}`; const returnPromise = (async () => {
if (currentExecutionTry === lastExecutionTry) { executionLoop:
throw new Error('Did stop execution because execution seems to be in endless loop.'); while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
}
if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) { // @ts-ignore
// If filter is set and node is not on filter skip it, that avoids the problem that it executes if (gotCancel === true) {
// leafs that are parallel to a selected destinationNode. Normally it would execute them because return Promise.resolve();
// they have the same parent and it executes all child nodes.
continue;
}
// Check if all the data which is needed to run the node is available
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
// Check if the node has incoming connections
if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) {
let inputConnections: IConnection[][];
let connectionIndex: number;
inputConnections = workflow.connectionsByDestinationNode[executionNode.name]['main'];
for (connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) {
if (workflow.getHighestNode(executionNode.name, 'main', connectionIndex).length === 0) {
// If there is no valid incoming node (if all are disabled)
// then ignore that it has inputs and simply execute it as it is without
// any data
continue;
}
if (!executionData.data!.hasOwnProperty('main')) {
// ExecutionData does not even have the connection set up so can
// not have that data, so add it again to be executed later
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) {
// Does not have the data of the connections so add back to stack
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
}
} }
}
// Clone input data that nodes can not mess up data of parallel nodes which receive the same data nodeSuccessData = null;
// TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned executionError = undefined;
// is very slow so only do if needed executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
startTime = new Date().getTime(); executionNode = executionData.node;
let maxTries = 1; this.executeHook('nodeExecuteBefore', [executionNode.name]);
if (executionData.node.retryOnFail === true) {
// TODO: Remove the hardcoded default-values here and also in NodeSettings.vue
maxTries = Math.min(5, Math.max(2, executionData.node.maxTries || 3));
}
let waitBetweenTries = 0; // Get the index of the current run
if (executionData.node.retryOnFail === true) { runIndex = 0;
// TODO: Remove the hardcoded default-values here and also in NodeSettings.vue if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
waitBetweenTries = Math.min(5000, Math.max(0, executionData.node.waitBetweenTries || 1000)); runIndex = this.runExecutionData.resultData.runData[executionNode.name].length;
}
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
try {
if (tryIndex !== 0) {
// Reset executionError from previous error try
executionError = undefined;
if (waitBetweenTries !== 0) {
// TODO: Improve that in the future and check if other nodes can
// be executed in the meantime
await new Promise((resolve) => {
setTimeout(() => {
resolve();
}, waitBetweenTries);
});
}
}
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
if (nodeSuccessData === null) {
// If null gets returned it means that the node did succeed
// but did not have any data. So the branch should end
// (meaning the nodes afterwards should not be processed)
continue executionLoop;
}
break;
} catch (error) {
executionError = {
message: error.message,
stack: error.stack,
};
} }
}
// Add the data to return to the user currentExecutionTry = `${executionNode.name}:${runIndex}`;
// (currently does not get cloned as data does not get changed, maybe later we should do that?!?!)
if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { if (currentExecutionTry === lastExecutionTry) {
this.runExecutionData.resultData.runData[executionNode.name] = []; throw new Error('Did stop execution because execution seems to be in endless loop.');
}
taskData = {
startTime,
executionTime: (new Date().getTime()) - startTime
};
if (executionError !== undefined) {
taskData.error = executionError;
if (executionData.node.continueOnFail === true) {
// Workflow should continue running even if node errors
if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) {
// Simply get the input data of the node if it has any and pass it through
// to the next node
if (executionData.data.main[0] !== null) {
nodeSuccessData = [executionData.data.main[0] as INodeExecutionData[]];
}
}
} else {
// Node execution did fail so add error and stop execution
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
// Add the execution data again so that it can get restarted
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]);
break;
} }
}
// Node executed successfully. So add data and go on. if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) {
taskData.data = ({ // If filter is set and node is not on filter skip it, that avoids the problem that it executes
'main': nodeSuccessData // leafs that are parallel to a selected destinationNode. Normally it would execute them because
} as ITaskDataConnections); // they have the same parent and it executes all child nodes.
continue;
}
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); // Check if all the data which is needed to run the node is available
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
// Check if the node has incoming connections
if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) {
let inputConnections: IConnection[][];
let connectionIndex: number;
this.runExecutionData.resultData.runData[executionNode.name].push(taskData); inputConnections = workflow.connectionsByDestinationNode[executionNode.name]['main'];
if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) { for (connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) {
// If destination node is defined and got executed stop execution if (workflow.getHighestNode(executionNode.name, 'main', connectionIndex).length === 0) {
continue; // If there is no valid incoming node (if all are disabled)
} // then ignore that it has inputs and simply execute it as it is without
// any data
// Add the nodes to which the current node has an output connection to that they can continue;
// be executed next
if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) {
if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) {
let outputIndex: string, connectionData: IConnection;
// Go over all the different
// Add the nodes to be executed
for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) {
if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) {
continue;
}
// Go through all the different outputs of this connection
for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) {
if (!workflow.nodes.hasOwnProperty(connectionData.node)) {
return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`));
} }
this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex); if (!executionData.data!.hasOwnProperty('main')) {
// ExecutionData does not even have the connection set up so can
// not have that data, so add it again to be executed later
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) {
// Does not have the data of the connections so add back to stack
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
}
}
}
// Clone input data that nodes can not mess up data of parallel nodes which receive the same data
// TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned
// is very slow so only do if needed
startTime = new Date().getTime();
let maxTries = 1;
if (executionData.node.retryOnFail === true) {
// TODO: Remove the hardcoded default-values here and also in NodeSettings.vue
maxTries = Math.min(5, Math.max(2, executionData.node.maxTries || 3));
}
let waitBetweenTries = 0;
if (executionData.node.retryOnFail === true) {
// TODO: Remove the hardcoded default-values here and also in NodeSettings.vue
waitBetweenTries = Math.min(5000, Math.max(0, executionData.node.waitBetweenTries || 1000));
}
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
// @ts-ignore
if (gotCancel === true) {
return Promise.resolve();
}
try {
if (tryIndex !== 0) {
// Reset executionError from previous error try
executionError = undefined;
if (waitBetweenTries !== 0) {
// TODO: Improve that in the future and check if other nodes can
// be executed in the meantime
await new Promise((resolve) => {
setTimeout(() => {
resolve();
}, waitBetweenTries);
});
}
}
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
if (nodeSuccessData === null) {
// If null gets returned it means that the node did succeed
// but did not have any data. So the branch should end
// (meaning the nodes afterwards should not be processed)
continue executionLoop;
}
break;
} catch (error) {
executionError = {
message: error.message,
stack: error.stack,
};
}
}
// Add the data to return to the user
// (currently does not get cloned as data does not get changed, maybe later we should do that?!?!)
if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
this.runExecutionData.resultData.runData[executionNode.name] = [];
}
taskData = {
startTime,
executionTime: (new Date().getTime()) - startTime
};
if (executionError !== undefined) {
taskData.error = executionError;
if (executionData.node.continueOnFail === true) {
// Workflow should continue running even if node errors
if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) {
// Simply get the input data of the node if it has any and pass it through
// to the next node
if (executionData.data.main[0] !== null) {
nodeSuccessData = [executionData.data.main[0] as INodeExecutionData[]];
}
}
} else {
// Node execution did fail so add error and stop execution
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
// Add the execution data again so that it can get restarted
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]);
break;
}
}
// Node executed successfully. So add data and go on.
taskData.data = ({
'main': nodeSuccessData
} as ITaskDataConnections);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]);
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) {
// If destination node is defined and got executed stop execution
continue;
}
// Add the nodes to which the current node has an output connection to that they can
// be executed next
if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) {
if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) {
let outputIndex: string, connectionData: IConnection;
// Go over all the different
// Add the nodes to be executed
for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) {
if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) {
continue;
}
// Go through all the different outputs of this connection
for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) {
if (!workflow.nodes.hasOwnProperty(connectionData.node)) {
return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`));
}
this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex);
}
} }
} }
} }
} }
}
return Promise.resolve(); return Promise.resolve();
})() })()
.then(async () => { .then(async () => {
return this.processSuccessExecution(startedAt, workflow, executionError); return this.processSuccessExecution(startedAt, workflow, executionError);
}) })
.catch(async (error) => { .catch(async (error) => {
const fullRunData = this.getFullRunData(startedAt); const fullRunData = this.getFullRunData(startedAt);
fullRunData.data.resultData.error = { fullRunData.data.resultData.error = {
message: error.message, message: error.message,
stack: error.stack, stack: error.stack,
}; };
// Check if static data changed // Check if static data changed
let newStaticData: IDataObject | undefined; let newStaticData: IDataObject | undefined;
if (workflow.staticData.__dataChanged === true) { if (workflow.staticData.__dataChanged === true) {
// Static data of workflow changed // Static data of workflow changed
newStaticData = workflow.staticData; newStaticData = workflow.staticData;
} }
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
return fullRunData; return fullRunData;
});
return returnPromise.then(resolve);
}); });
} }
async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: IExecutionError): Promise<IRun> { // @ts-ignore
async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: IExecutionError): PCancelable<IRun> {
const fullRunData = this.getFullRunData(startedAt); const fullRunData = this.getFullRunData(startedAt);
if (executionError !== undefined) { if (executionError !== undefined) {