mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
5a179cd5ae
* refactor saving * refactor api layer to be stateless * refactor header details * set variable for menu height * clean up scss * clean up indentation * clean up dropdown impl * refactor no tags view * split away header * Fix tslint issues * Refactor tag manager * add tags to patch request * clean up scss * ⚡ Refactor types to entities * fix issues * update no workflow error * clean up tagscontainer * use getters instead of state * remove imports * use custom colors * clean up tags container * clean up dropdown * clean up focusoncreate * ⚡ Ignore mistaken ID in POST /workflows * ⚡ Fix undefined tag ID in PATCH /workflows * ⚡ Shorten response for POST /tags * remove scss mixins * clean up imports * ⚡ Implement validation with class-validator * address ivan's comments * implement modals * Fix lint issues * fix disabling shortcuts * fix focus issues * fix focus issues * fix focus issues with modal * fix linting issues * use dispatch * use constants for modal keys * fix focus * fix lint issues * remove unused prop * add modal root * fix lint issues * remove unused methods * fix shortcut * remove max width * ⚡ Fix duplicate entry error for pg and MySQL * update rename messaging * update order of buttons * fix firefox overflow on windows * fix dropdown height * 🔨 refactor tag crud controllers * 🧹 remove unused imports * use variable for number of items * fix dropdown spacing * ⚡ Restore type to fix build * ⚡ Fix post-refactor PATCH /workflows/:id * ⚡ Fix PATCH /workflows/:id for zero tags * ⚡ Fix usage count becoming stringified * address max's comments * fix filter spacing * fix blur bug * address most of ivan's comments * address tags type concern * remove defaults * ⚡ return tag id as string * 🔨 add hooks to tag CUD operations * 🏎 simplify timestamp pruning * remove blur event * fix onblur bug * ⚡ Fix fs import to fix build * address max's comments * implement responsive tag container * fix lint issues * update tag limits * address ivan's comments * remove rename, refactor header, implement new designs for save, remove responsive tag container * update styling * update styling * implement responsive tag container * implement header tags edit * implement header tags edit * fix lint issues * implement expandable input * minor fixes * minor fixes * use variable * rename save as * duplicate fixes * minor edit fixes * lint fixes * style fixes * hook up saving name * hook up tags * clean up impl * fix dirty state bug * update limit * update notification messages * on click outside * fix minor bug with count * lint fixes * handle minor edge cases * handle minor edge cases * handle minor bugs; fix firefox dropdown issue * Fix min width * apply tags only after api success * remove count fix * clean up workflow tags impl, fix tags delete bug * fix minor issue * fix minor spacing issue * disable wrap for ops * fix viewport root; save on click in dropdown * save button loading when saving name/tags * implement max width on tags container * implement cleaner create experience * disable edit while updating * codacy hex color * refactor tags container * fix clickability * fix workflow open and count * clean up structure * fix up lint issues * fix button size * increase workflow name limit for larger screen * tslint fixes * disable responsiveness for workflow modal * rename event * change min width for tags * clean up pr * address max's comments on styles * remove success toasts * add hover mode to name * minor fixes * refactor name preview * fix name input not to jiggle * finish up name input * Fix up add tags * clean up param * clean up scss * fix resizing name * fix resizing name * fix resize bug * clean up edit spacing * ignore on esc * fix input bug * focus input on clear * build * fix up add tags clickablity * remove scrollbars * move into folders * clean up multiple patch req * remove padding top from edit * update tags on enter * build * rollout blur on enter behavior * rollout esc behavior * fix tags bug when duplicating tags * move key to reload tags * update header spacing * build * update hex case * refactor workflow title * remove unusued prop * keep focus on error, fix bug on error * Fix bug with name / tags toggle on error * fix connection push bug * :spakles: Implement wait functionality * 🐛 Do not delete waiting executions with prune * ⚡ Improve SQLite migration to not lose execution data anymore * ⚡ Make it possible to restart waiting execution via webhook * ⚡ Add missing file * 🐛 Some more merge fixes * ⚡ Do not show error for Wait-Nodes if in time-mode * ⚡ Make $executionId available in expressions * 👕 Fix lint issue * 👕 Fix lint issue * 👕 Fix lint issue * ⚡ Set the unlimited sleep time as a variable * ⚡ Add also sleeping webhook path to config * ⚡ Make it possible to retrieve restartUrl in workflow * ⚡ Add authentication to Wait-Node in Webhook-Mode * ⚡ Return 404 when trying to restart execution via webhook which does not support it * ✨ Make it possible to set absolute time on Wait-Node * ⚡ Remove not needed imports * ⚡ Fix description format * ✨ Implement missing webhook features on Wait-Node * ⚡ Display webhook variable in NodeWebhooks * ⚡ Include also date in displayed sleep time * ⚡ Make it possible to see sleep time on node * ⚡ Make sure that no executions does get executed twice * ⚡ Add comment * ⚡ Further improvements * ⚡ Make Wait-Node easier to use * ✨ Add support for "notice" parameter type * Fixing wait node to work with queue, improved logging and execution view * Added support for mysql and pg * ✨ Add support for webhook postfix path * ✨ Make it possible to stop sleeping executions * ⚡ Fix issue with webhook paths in not webhook mode * ⚡ Remove not needed console.log * ⚡ Update TODOs * ⚡ Increase min time of workflow staying active to descrease possible issue with overlap * 👕 Fix lint issue * 🐛 Fix issues with webhooks * ⚡ Make error message clearer * ⚡ Fix issue with missing execution ID in scaling mode * Fixed execution list to correctly display waiting executins * Feature: enable webhook wait workflows to continue after specified time * Fixed linting * ⚡ Improve waiting description text * ⚡ Fix parameter display issue and rename * ⚡ Remove comment * ⚡ Do not display webhooks on Wait-Node * Changed wording from restart to resume on wait node * Fixed wording and inconsistent screen when changing resume modes * Removed dots from the descriptions * Changed docs url and renaming postfix to suffix * Changed names from sleep to wait * ⚡ Apply suggestions from ben Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com> * Some fixes by Ben * ⚡ Remove console.logs * ⚡ Fixes and improvements Co-authored-by: Mutasem <mutdmour@gmail.com> Co-authored-by: Iván Ovejero <ivov.src@gmail.com> Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com> Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com> Co-authored-by: Omar Ajoue <krynble@gmail.com>
899 lines
31 KiB
TypeScript
899 lines
31 KiB
TypeScript
import * as PCancelable from 'p-cancelable';
|
|
|
|
import {
|
|
ExecutionError,
|
|
IConnection,
|
|
IDataObject,
|
|
IExecuteData,
|
|
INode,
|
|
INodeConnections,
|
|
INodeExecutionData,
|
|
IRun,
|
|
IRunData,
|
|
IRunExecutionData,
|
|
ITaskData,
|
|
ITaskDataConnections,
|
|
IWaitingForExecution,
|
|
IWorkflowExecuteAdditionalData,
|
|
LoggerProxy as Logger,
|
|
Workflow,
|
|
WorkflowExecuteMode,
|
|
WorkflowOperationError,
|
|
} from 'n8n-workflow';
|
|
import {
|
|
NodeExecuteFunctions,
|
|
} from './';
|
|
|
|
import { get } from 'lodash';
|
|
|
|
export class WorkflowExecute {
|
|
runExecutionData: IRunExecutionData;
|
|
private additionalData: IWorkflowExecuteAdditionalData;
|
|
private mode: WorkflowExecuteMode;
|
|
|
|
constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, runExecutionData?: IRunExecutionData) {
|
|
this.additionalData = additionalData;
|
|
this.mode = mode;
|
|
this.runExecutionData = runExecutionData || {
|
|
startData: {
|
|
},
|
|
resultData: {
|
|
runData: {},
|
|
},
|
|
executionData: {
|
|
contextData: {},
|
|
nodeExecutionStack: [],
|
|
waitingExecution: {},
|
|
},
|
|
};
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Executes the given workflow.
|
|
*
|
|
* @param {Workflow} workflow The workflow to execute
|
|
* @param {INode[]} [startNodes] Node to start execution from
|
|
* @param {string} [destinationNode] Node to stop execution at
|
|
* @returns {(Promise<string>)}
|
|
* @memberof WorkflowExecute
|
|
*/
|
|
run(workflow: Workflow, startNode?: INode, destinationNode?: string): PCancelable<IRun> {
|
|
// Get the nodes to start workflow execution from
|
|
startNode = startNode || workflow.getStartNode(destinationNode);
|
|
|
|
if (startNode === undefined) {
|
|
throw new Error('No node to start the workflow from could be found!');
|
|
}
|
|
|
|
// If a destination node is given we only run the direct parent nodes and no others
|
|
let runNodeFilter: string[] | undefined = undefined;
|
|
if (destinationNode) {
|
|
runNodeFilter = workflow.getParentNodes(destinationNode);
|
|
runNodeFilter.push(destinationNode);
|
|
}
|
|
|
|
// Initialize the data of the start nodes
|
|
const nodeExecutionStack: IExecuteData[] = [
|
|
{
|
|
node: startNode,
|
|
data: {
|
|
main: [
|
|
[
|
|
{
|
|
json: {},
|
|
},
|
|
],
|
|
],
|
|
},
|
|
},
|
|
];
|
|
|
|
this.runExecutionData = {
|
|
startData: {
|
|
destinationNode,
|
|
runNodeFilter,
|
|
},
|
|
resultData: {
|
|
runData: {},
|
|
},
|
|
executionData: {
|
|
contextData: {},
|
|
nodeExecutionStack,
|
|
waitingExecution: {},
|
|
},
|
|
};
|
|
|
|
return this.processRunExecutionData(workflow);
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Executes the given workflow but only
|
|
*
|
|
* @param {Workflow} workflow The workflow to execute
|
|
* @param {IRunData} runData
|
|
* @param {string[]} startNodes Nodes to start execution from
|
|
* @param {string} destinationNode Node to stop execution at
|
|
* @returns {(Promise<string>)}
|
|
* @memberof WorkflowExecute
|
|
*/
|
|
// @ts-ignore
|
|
async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): PCancelable<IRun> {
|
|
let incomingNodeConnections: INodeConnections | undefined;
|
|
let connection: IConnection;
|
|
|
|
const runIndex = 0;
|
|
|
|
// Initialize the nodeExecutionStack and waitingExecution with
|
|
// the data from runData
|
|
const nodeExecutionStack: IExecuteData[] = [];
|
|
const waitingExecution: IWaitingForExecution = {};
|
|
for (const startNode of startNodes) {
|
|
incomingNodeConnections = workflow.connectionsByDestinationNode[startNode];
|
|
|
|
const incomingData: INodeExecutionData[][] = [];
|
|
|
|
if (incomingNodeConnections === undefined) {
|
|
// If it has no incoming data add the default empty data
|
|
incomingData.push([
|
|
{
|
|
json: {},
|
|
},
|
|
]);
|
|
} else {
|
|
// Get the data of the incoming connections
|
|
for (const connections of incomingNodeConnections.main) {
|
|
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
|
|
connection = connections[inputIndex];
|
|
incomingData.push(
|
|
runData[connection.node!][runIndex].data![connection.type][connection.index]!,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
const executeData: IExecuteData = {
|
|
node: workflow.getNode(startNode) as INode,
|
|
data: {
|
|
main: incomingData,
|
|
},
|
|
};
|
|
|
|
nodeExecutionStack.push(executeData);
|
|
|
|
// Check if the destinationNode has to be added as waiting
|
|
// because some input data is already fully available
|
|
incomingNodeConnections = workflow.connectionsByDestinationNode[destinationNode];
|
|
if (incomingNodeConnections !== undefined) {
|
|
for (const connections of incomingNodeConnections.main) {
|
|
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
|
|
connection = connections[inputIndex];
|
|
|
|
if (waitingExecution[destinationNode] === undefined) {
|
|
waitingExecution[destinationNode] = {};
|
|
}
|
|
if (waitingExecution[destinationNode][runIndex] === undefined) {
|
|
waitingExecution[destinationNode][runIndex] = {};
|
|
}
|
|
if (waitingExecution[destinationNode][runIndex][connection.type] === undefined) {
|
|
waitingExecution[destinationNode][runIndex][connection.type] = [];
|
|
}
|
|
|
|
|
|
if (runData[connection.node!] !== undefined) {
|
|
// Input data exists so add as waiting
|
|
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
|
|
waitingExecution[destinationNode][runIndex][connection.type].push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
|
|
} else {
|
|
waitingExecution[destinationNode][runIndex][connection.type].push(null);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Only run the parent nodes and no others
|
|
let runNodeFilter: string[] | undefined = undefined;
|
|
runNodeFilter = workflow.getParentNodes(destinationNode);
|
|
runNodeFilter.push(destinationNode);
|
|
|
|
this.runExecutionData = {
|
|
startData: {
|
|
destinationNode,
|
|
runNodeFilter,
|
|
},
|
|
resultData: {
|
|
runData,
|
|
},
|
|
executionData: {
|
|
contextData: {},
|
|
nodeExecutionStack,
|
|
waitingExecution,
|
|
},
|
|
};
|
|
|
|
return this.processRunExecutionData(workflow);
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Executes the hook with the given name
|
|
*
|
|
* @param {string} hookName
|
|
* @param {any[]} parameters
|
|
* @returns {Promise<IRun>}
|
|
* @memberof WorkflowExecute
|
|
*/
|
|
async executeHook(hookName: string, parameters: any[]): Promise<void> { // tslint:disable-line:no-any
|
|
if (this.additionalData.hooks === undefined) {
|
|
return;
|
|
}
|
|
|
|
return this.additionalData.hooks.executeHookFunctions(hookName, parameters);
|
|
}
|
|
|
|
|
|
/**
|
|
* Checks the incoming connection does not receive any data
|
|
*/
|
|
incomingConnectionIsEmpty(runData: IRunData, inputConnections: IConnection[], runIndex: number): boolean {
|
|
// for (const inputConnection of workflow.connectionsByDestinationNode[nodeToAdd].main[0]) {
|
|
for (const inputConnection of inputConnections) {
|
|
const nodeIncomingData = get(runData, `[${inputConnection.node}][${runIndex}].data.main[${inputConnection.index}]`);
|
|
if (nodeIncomingData !== undefined && (nodeIncomingData as object[]).length !== 0) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
addNodeToBeExecuted(workflow: Workflow, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void {
|
|
let stillDataMissing = false;
|
|
|
|
// Check if node has multiple inputs as then we have to wait for all input data
|
|
// to be present before we can add it to the node-execution-stack
|
|
if (workflow.connectionsByDestinationNode[connectionData.node]['main'].length > 1) {
|
|
// Node has multiple inputs
|
|
let nodeWasWaiting = true;
|
|
|
|
// Check if there is already data for the node
|
|
if (this.runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) {
|
|
// Node does not have data yet so create a new empty one
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
|
|
nodeWasWaiting = false;
|
|
}
|
|
if (this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) {
|
|
// Node does not have data for runIndex yet so create also empty one and init it
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
|
|
main: [],
|
|
};
|
|
for (let i = 0; i < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; i++) {
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null);
|
|
}
|
|
}
|
|
|
|
// Add the new data
|
|
if (nodeSuccessData === null) {
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null;
|
|
} else {
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex];
|
|
}
|
|
|
|
// Check if all data exists now
|
|
let thisExecutionData: INodeExecutionData[] | null;
|
|
let allDataFound = true;
|
|
for (let i = 0; i < this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) {
|
|
thisExecutionData = this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i];
|
|
if (thisExecutionData === null) {
|
|
allDataFound = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (allDataFound === true) {
|
|
// All data exists for node to be executed
|
|
// So add it to the execution stack
|
|
this.runExecutionData.executionData!.nodeExecutionStack.push({
|
|
node: workflow.nodes[connectionData.node],
|
|
data: this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex],
|
|
});
|
|
|
|
// Remove the data from waiting
|
|
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
|
|
|
|
if (Object.keys(this.runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) {
|
|
// No more data left for the node so also delete that one
|
|
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node];
|
|
}
|
|
return;
|
|
} else {
|
|
stillDataMissing = true;
|
|
}
|
|
|
|
if (nodeWasWaiting === false) {
|
|
|
|
// Get a list of all the output nodes that we can check for siblings easier
|
|
const checkOutputNodes = [];
|
|
for (const outputIndexParent in workflow.connectionsBySourceNode[parentNodeName].main) {
|
|
if (!workflow.connectionsBySourceNode[parentNodeName].main.hasOwnProperty(outputIndexParent)) {
|
|
continue;
|
|
}
|
|
for (const connectionDataCheck of workflow.connectionsBySourceNode[parentNodeName].main[outputIndexParent]) {
|
|
checkOutputNodes.push(connectionDataCheck.node);
|
|
}
|
|
}
|
|
|
|
// Node was not on "waitingExecution" so it is the first time it gets
|
|
// checked. So we have to go through all the inputs and check if they
|
|
// are already on the list to be processed.
|
|
// If that is not the case add it.
|
|
for (let inputIndex = 0; inputIndex < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; inputIndex++) {
|
|
for (const inputData of workflow.connectionsByDestinationNode[connectionData.node]['main'][inputIndex]) {
|
|
if (inputData.node === parentNodeName) {
|
|
// Is the node we come from so its data will be available for sure
|
|
continue;
|
|
}
|
|
|
|
const executionStackNodes = this.runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name);
|
|
|
|
// Check if that node is also an output connection of the
|
|
// previously processed one
|
|
if (inputData.node !== parentNodeName && checkOutputNodes.includes(inputData.node)) {
|
|
// So the parent node will be added anyway which
|
|
// will then process this node next. So nothing to do
|
|
// unless the incoming data of the node is empty
|
|
// because then it would not be executed
|
|
if (!this.incomingConnectionIsEmpty(this.runExecutionData.resultData.runData, workflow.connectionsByDestinationNode[inputData.node].main[0], runIndex)) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Check if it is already in the execution stack
|
|
if (executionStackNodes.includes(inputData.node)) {
|
|
// Node is already on the list to be executed
|
|
// so there is nothing to do
|
|
continue;
|
|
}
|
|
|
|
// Check if node got processed already
|
|
if (this.runExecutionData.resultData.runData[inputData.node] !== undefined) {
|
|
// Node got processed already so no need to add it
|
|
continue;
|
|
}
|
|
|
|
// Check if any of the parent nodes does not have any inputs. That
|
|
// would mean that it has to get added to the list of nodes to process.
|
|
const parentNodes = workflow.getParentNodes(inputData.node, 'main', -1);
|
|
let nodeToAdd: string | undefined = inputData.node;
|
|
parentNodes.push(inputData.node);
|
|
parentNodes.reverse();
|
|
|
|
for (const parentNode of parentNodes) {
|
|
// Check if that node is also an output connection of the
|
|
// previously processed one
|
|
if (inputData.node !== parentNode && checkOutputNodes.includes(parentNode)) {
|
|
// So the parent node will be added anyway which
|
|
// will then process this node next. So nothing to do.
|
|
nodeToAdd = undefined;
|
|
break;
|
|
}
|
|
|
|
// Check if it is already in the execution stack
|
|
if (executionStackNodes.includes(parentNode)) {
|
|
// Node is already on the list to be executed
|
|
// so there is nothing to do
|
|
nodeToAdd = undefined;
|
|
break;
|
|
}
|
|
|
|
// Check if node got processed already
|
|
if (this.runExecutionData.resultData.runData[parentNode] !== undefined) {
|
|
// Node got processed already so we can use the
|
|
// output data as input of this node
|
|
break;
|
|
}
|
|
|
|
nodeToAdd = parentNode;
|
|
}
|
|
const parentNodesNodeToAdd = workflow.getParentNodes(nodeToAdd as string);
|
|
if (parentNodesNodeToAdd.includes(parentNodeName) && nodeSuccessData[outputIndex].length === 0) {
|
|
// We do not add the node if there is no input data and the node that should be connected
|
|
// is a child of the parent node. Because else it would run a node even though it should be
|
|
// specifically not run, as it did not receive any data.
|
|
nodeToAdd = undefined;
|
|
}
|
|
|
|
if (nodeToAdd === undefined) {
|
|
// No node has to get added so process
|
|
continue;
|
|
}
|
|
|
|
let addEmptyItem = false;
|
|
|
|
if (workflow.connectionsByDestinationNode[nodeToAdd] === undefined) {
|
|
// Add empty item if the node does not have any input connections
|
|
addEmptyItem = true;
|
|
} else {
|
|
if (this.incomingConnectionIsEmpty(this.runExecutionData.resultData.runData, workflow.connectionsByDestinationNode[nodeToAdd].main[0], runIndex)) {
|
|
// Add empty item also if the input data is empty
|
|
addEmptyItem = true;
|
|
}
|
|
}
|
|
|
|
if (addEmptyItem === true) {
|
|
// Add only node if it does not have any inputs because else it will
|
|
// be added by its input node later anyway.
|
|
this.runExecutionData.executionData!.nodeExecutionStack.push(
|
|
{
|
|
node: workflow.getNode(nodeToAdd) as INode,
|
|
data: {
|
|
main: [
|
|
[
|
|
{
|
|
json: {},
|
|
},
|
|
],
|
|
],
|
|
},
|
|
},
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Make sure the array has all the values
|
|
const connectionDataArray: Array<INodeExecutionData[] | null> = [];
|
|
for (let i: number = connectionData.index; i >= 0; i--) {
|
|
connectionDataArray[i] = null;
|
|
}
|
|
|
|
// Add the data of the current execution
|
|
if (nodeSuccessData === null) {
|
|
connectionDataArray[connectionData.index] = null;
|
|
} else {
|
|
connectionDataArray[connectionData.index] = nodeSuccessData[outputIndex];
|
|
}
|
|
|
|
if (stillDataMissing === true) {
|
|
// Additional data is needed to run node so add it to waiting
|
|
if (!this.runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) {
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
|
|
}
|
|
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
|
|
main: connectionDataArray,
|
|
};
|
|
} else {
|
|
// All data is there so add it directly to stack
|
|
this.runExecutionData.executionData!.nodeExecutionStack.push({
|
|
node: workflow.nodes[connectionData.node],
|
|
data: {
|
|
main: connectionDataArray,
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Runs the given execution data.
|
|
*
|
|
* @param {Workflow} workflow
|
|
* @returns {Promise<string>}
|
|
* @memberof WorkflowExecute
|
|
*/
|
|
processRunExecutionData(workflow: Workflow): PCancelable<IRun> {
|
|
Logger.verbose('Workflow execution started', { workflowId: workflow.id });
|
|
|
|
const startedAt = new Date();
|
|
|
|
const workflowIssues = workflow.checkReadyForExecution();
|
|
if (workflowIssues !== null) {
|
|
throw new Error('The workflow has issues and can for that reason not be executed. Please fix them first.');
|
|
}
|
|
|
|
// Variables which hold temporary data for each node-execution
|
|
let executionData: IExecuteData;
|
|
let executionError: ExecutionError | undefined;
|
|
let executionNode: INode;
|
|
let nodeSuccessData: INodeExecutionData[][] | null | undefined;
|
|
let runIndex: number;
|
|
let startTime: number;
|
|
let taskData: ITaskData;
|
|
|
|
if (this.runExecutionData.startData === undefined) {
|
|
this.runExecutionData.startData = {};
|
|
}
|
|
|
|
if (this.runExecutionData.waitTill) {
|
|
const lastNodeExecuted = this.runExecutionData.resultData.lastNodeExecuted as string;
|
|
this.runExecutionData.executionData!.nodeExecutionStack[0].node.disabled = true;
|
|
this.runExecutionData.waitTill = undefined;
|
|
this.runExecutionData.resultData.runData[lastNodeExecuted].pop();
|
|
}
|
|
|
|
let currentExecutionTry = '';
|
|
let lastExecutionTry = '';
|
|
|
|
return new PCancelable((resolve, reject, onCancel) => {
|
|
let gotCancel = false;
|
|
|
|
onCancel.shouldReject = false;
|
|
onCancel(() => {
|
|
gotCancel = true;
|
|
});
|
|
|
|
const returnPromise = (async () => {
|
|
try {
|
|
await this.executeHook('workflowExecuteBefore', [workflow]);
|
|
} catch (error) {
|
|
|
|
// Set the error that it can be saved correctly
|
|
executionError = {
|
|
...error,
|
|
message: error.message,
|
|
stack: error.stack,
|
|
};
|
|
|
|
// Set the incoming data of the node that it can be saved correctly
|
|
executionData = this.runExecutionData.executionData!.nodeExecutionStack[0] as IExecuteData;
|
|
this.runExecutionData.resultData = {
|
|
runData: {
|
|
[executionData.node.name]: [
|
|
{
|
|
startTime,
|
|
executionTime: (new Date().getTime()) - startTime,
|
|
data: ({
|
|
'main': executionData.data.main,
|
|
} as ITaskDataConnections),
|
|
},
|
|
],
|
|
},
|
|
lastNodeExecuted: executionData.node.name,
|
|
error: executionError,
|
|
};
|
|
|
|
throw error;
|
|
}
|
|
|
|
executionLoop:
|
|
while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
|
|
|
|
if (this.additionalData.executionTimeoutTimestamp !== undefined && Date.now() >= this.additionalData.executionTimeoutTimestamp) {
|
|
gotCancel = true;
|
|
}
|
|
|
|
// @ts-ignore
|
|
if (gotCancel === true) {
|
|
return Promise.resolve();
|
|
}
|
|
|
|
nodeSuccessData = null;
|
|
executionError = undefined;
|
|
executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
|
|
executionNode = executionData.node;
|
|
|
|
Logger.debug(`Start processing node "${executionNode.name}"`, { node: executionNode.name, workflowId: workflow.id });
|
|
await this.executeHook('nodeExecuteBefore', [executionNode.name]);
|
|
|
|
// Get the index of the current run
|
|
runIndex = 0;
|
|
if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
|
|
runIndex = this.runExecutionData.resultData.runData[executionNode.name].length;
|
|
}
|
|
|
|
currentExecutionTry = `${executionNode.name}:${runIndex}`;
|
|
|
|
if (currentExecutionTry === lastExecutionTry) {
|
|
throw new Error('Did stop execution because execution seems to be in endless loop.');
|
|
}
|
|
|
|
if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) {
|
|
// If filter is set and node is not on filter skip it, that avoids the problem that it executes
|
|
// leafs that are parallel to a selected destinationNode. Normally it would execute them because
|
|
// they have the same parent and it executes all child nodes.
|
|
continue;
|
|
}
|
|
|
|
// Check if all the data which is needed to run the node is available
|
|
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
|
|
// Check if the node has incoming connections
|
|
if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) {
|
|
let inputConnections: IConnection[][];
|
|
let connectionIndex: number;
|
|
|
|
inputConnections = workflow.connectionsByDestinationNode[executionNode.name]['main'];
|
|
|
|
for (connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) {
|
|
if (workflow.getHighestNode(executionNode.name, 'main', connectionIndex).length === 0) {
|
|
// If there is no valid incoming node (if all are disabled)
|
|
// then ignore that it has inputs and simply execute it as it is without
|
|
// any data
|
|
continue;
|
|
}
|
|
|
|
if (!executionData.data!.hasOwnProperty('main')) {
|
|
// ExecutionData does not even have the connection set up so can
|
|
// not have that data, so add it again to be executed later
|
|
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
|
|
lastExecutionTry = currentExecutionTry;
|
|
continue executionLoop;
|
|
}
|
|
|
|
// Check if it has the data for all the inputs
|
|
// The most nodes just have one but merge node for example has two and data
|
|
// of both inputs has to be available to be able to process the node.
|
|
if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) {
|
|
// Does not have the data of the connections so add back to stack
|
|
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
|
|
lastExecutionTry = currentExecutionTry;
|
|
continue executionLoop;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clone input data that nodes can not mess up data of parallel nodes which receive the same data
|
|
// TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned
|
|
// is very slow so only do if needed
|
|
startTime = new Date().getTime();
|
|
|
|
let maxTries = 1;
|
|
if (executionData.node.retryOnFail === true) {
|
|
// TODO: Remove the hardcoded default-values here and also in NodeSettings.vue
|
|
maxTries = Math.min(5, Math.max(2, executionData.node.maxTries || 3));
|
|
}
|
|
|
|
let waitBetweenTries = 0;
|
|
if (executionData.node.retryOnFail === true) {
|
|
// TODO: Remove the hardcoded default-values here and also in NodeSettings.vue
|
|
waitBetweenTries = Math.min(5000, Math.max(0, executionData.node.waitBetweenTries || 1000));
|
|
}
|
|
|
|
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
|
|
// @ts-ignore
|
|
if (gotCancel === true) {
|
|
return Promise.resolve();
|
|
}
|
|
try {
|
|
|
|
if (tryIndex !== 0) {
|
|
// Reset executionError from previous error try
|
|
executionError = undefined;
|
|
if (waitBetweenTries !== 0) {
|
|
// TODO: Improve that in the future and check if other nodes can
|
|
// be executed in the meantime
|
|
await new Promise((resolve) => {
|
|
setTimeout(() => {
|
|
resolve(undefined);
|
|
}, waitBetweenTries);
|
|
});
|
|
}
|
|
}
|
|
|
|
Logger.debug(`Running node "${executionNode.name}" started`, { node: executionNode.name, workflowId: workflow.id });
|
|
nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
|
|
Logger.debug(`Running node "${executionNode.name}" finished successfully`, { node: executionNode.name, workflowId: workflow.id });
|
|
|
|
if (nodeSuccessData === undefined) {
|
|
// Node did not get executed
|
|
nodeSuccessData = null;
|
|
} else {
|
|
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
|
|
}
|
|
|
|
if (nodeSuccessData === null || nodeSuccessData[0][0] === undefined) {
|
|
if (executionData.node.alwaysOutputData === true) {
|
|
nodeSuccessData = nodeSuccessData || [];
|
|
nodeSuccessData[0] = [
|
|
{
|
|
json: {},
|
|
},
|
|
];
|
|
}
|
|
}
|
|
|
|
if (nodeSuccessData === null && !this.runExecutionData.waitTill!!) {
|
|
// If null gets returned it means that the node did succeed
|
|
// but did not have any data. So the branch should end
|
|
// (meaning the nodes afterwards should not be processed)
|
|
continue executionLoop;
|
|
}
|
|
|
|
break;
|
|
} catch (error) {
|
|
|
|
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
|
|
|
|
executionError = {
|
|
...error,
|
|
message: error.message,
|
|
stack: error.stack,
|
|
};
|
|
|
|
Logger.debug(`Running node "${executionNode.name}" finished with error`, { node: executionNode.name, workflowId: workflow.id });
|
|
}
|
|
}
|
|
|
|
// Add the data to return to the user
|
|
// (currently does not get cloned as data does not get changed, maybe later we should do that?!?!)
|
|
|
|
if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
|
|
this.runExecutionData.resultData.runData[executionNode.name] = [];
|
|
}
|
|
taskData = {
|
|
startTime,
|
|
executionTime: (new Date().getTime()) - startTime,
|
|
};
|
|
|
|
if (executionError !== undefined) {
|
|
taskData.error = executionError;
|
|
|
|
if (executionData.node.continueOnFail === true) {
|
|
// Workflow should continue running even if node errors
|
|
if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) {
|
|
// Simply get the input data of the node if it has any and pass it through
|
|
// to the next node
|
|
if (executionData.data.main[0] !== null) {
|
|
nodeSuccessData = [executionData.data.main[0] as INodeExecutionData[]];
|
|
}
|
|
}
|
|
} else {
|
|
// Node execution did fail so add error and stop execution
|
|
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
|
|
|
|
// Add the execution data again so that it can get restarted
|
|
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
|
|
|
|
await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData]);
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Node executed successfully. So add data and go on.
|
|
taskData.data = ({
|
|
'main': nodeSuccessData,
|
|
} as ITaskDataConnections);
|
|
|
|
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
|
|
|
|
if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) {
|
|
// Before stopping, make sure we are executing hooks so
|
|
// That frontend is notified for example for manual executions.
|
|
await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData]);
|
|
|
|
// If destination node is defined and got executed stop execution
|
|
continue;
|
|
}
|
|
|
|
if (this.runExecutionData.waitTill!!) {
|
|
await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData]);
|
|
|
|
// Add the node back to the stack that the workflow can start to execute again from that node
|
|
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
|
|
|
|
break;
|
|
}
|
|
|
|
// Add the nodes to which the current node has an output connection to that they can
|
|
// be executed next
|
|
if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) {
|
|
if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) {
|
|
let outputIndex: string, connectionData: IConnection;
|
|
// Iterate over all the outputs
|
|
|
|
// Add the nodes to be executed
|
|
for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) {
|
|
if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) {
|
|
continue;
|
|
}
|
|
|
|
// Iterate over all the different connections of this output
|
|
for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) {
|
|
if (!workflow.nodes.hasOwnProperty(connectionData.node)) {
|
|
return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`));
|
|
}
|
|
|
|
if (nodeSuccessData![outputIndex] && (nodeSuccessData![outputIndex].length !== 0 || connectionData.index > 0)) {
|
|
// Add the node only if it did execute or if connected to second "optional" input
|
|
this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we got here, it means that we did not stop executing from manual executions / destination.
|
|
// Execute hooks now to make sure that all hooks are executed properly
|
|
// Await is needed to make sure that we don't fall into concurrency problems
|
|
// When saving node execution data
|
|
await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData]);
|
|
|
|
}
|
|
|
|
return Promise.resolve();
|
|
})()
|
|
.then(async () => {
|
|
if (gotCancel && executionError === undefined) {
|
|
return this.processSuccessExecution(startedAt, workflow, new WorkflowOperationError('Workflow has been canceled or timed out!'));
|
|
}
|
|
return this.processSuccessExecution(startedAt, workflow, executionError);
|
|
})
|
|
.catch(async (error) => {
|
|
const fullRunData = this.getFullRunData(startedAt);
|
|
|
|
fullRunData.data.resultData.error = {
|
|
...error,
|
|
message: error.message,
|
|
stack: error.stack,
|
|
};
|
|
|
|
// Check if static data changed
|
|
let newStaticData: IDataObject | undefined;
|
|
if (workflow.staticData.__dataChanged === true) {
|
|
// Static data of workflow changed
|
|
newStaticData = workflow.staticData;
|
|
}
|
|
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(error => {
|
|
console.error('There was a problem running hook "workflowExecuteAfter"', error);
|
|
});
|
|
|
|
return fullRunData;
|
|
});
|
|
|
|
return returnPromise.then(resolve);
|
|
});
|
|
}
|
|
|
|
|
|
// @ts-ignore
|
|
async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: ExecutionError): PCancelable<IRun> {
|
|
const fullRunData = this.getFullRunData(startedAt);
|
|
|
|
if (executionError !== undefined) {
|
|
Logger.verbose(`Workflow execution finished with error`, { error: executionError, workflowId: workflow.id });
|
|
fullRunData.data.resultData.error = {
|
|
...executionError,
|
|
message: executionError.message,
|
|
stack: executionError.stack,
|
|
} as ExecutionError;
|
|
} else if (this.runExecutionData.waitTill!!) {
|
|
Logger.verbose(`Workflow execution will wait until ${this.runExecutionData.waitTill}`, { workflowId: workflow.id });
|
|
fullRunData.waitTill = this.runExecutionData.waitTill;
|
|
} else {
|
|
Logger.verbose(`Workflow execution finished successfully`, { workflowId: workflow.id });
|
|
fullRunData.finished = true;
|
|
}
|
|
|
|
// Check if static data changed
|
|
let newStaticData: IDataObject | undefined;
|
|
if (workflow.staticData.__dataChanged === true) {
|
|
// Static data of workflow changed
|
|
newStaticData = workflow.staticData;
|
|
}
|
|
|
|
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
|
|
|
|
return fullRunData;
|
|
}
|
|
|
|
getFullRunData(startedAt: Date): IRun {
|
|
const fullRunData: IRun = {
|
|
data: this.runExecutionData,
|
|
mode: this.mode,
|
|
startedAt,
|
|
stoppedAt: new Date(),
|
|
};
|
|
|
|
return fullRunData;
|
|
}
|
|
|
|
}
|