mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
🐛 Fix bug that some nodes got executed twice
This commit is contained in:
parent
132f32132a
commit
bf174a4099
|
@ -241,6 +241,211 @@ export class WorkflowExecute {
|
|||
}
|
||||
|
||||
|
||||
addNodeToBeExecuted(workflow: Workflow, runExecutionData: IRunExecutionData, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void {
|
||||
let stillDataMissing = false;
|
||||
|
||||
// Check if node has multiple inputs as then we have to wait for all input data
|
||||
// to be present before we can add it to the node-execution-stack
|
||||
if (workflow.connectionsByDestinationNode[connectionData.node]['main'].length > 1) {
|
||||
// Node has multiple inputs
|
||||
let nodeWasWaiting = true;
|
||||
|
||||
// Check if there is already data for the node
|
||||
if (runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) {
|
||||
// Node does not have data yet so create a new empty one
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
|
||||
nodeWasWaiting = false;
|
||||
}
|
||||
if (runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) {
|
||||
// Node does not have data for runIndex yet so create also empty one and init it
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
|
||||
main: []
|
||||
};
|
||||
for (let i = 0; i < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; i++) {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the new data
|
||||
if (nodeSuccessData === null) {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null;
|
||||
} else {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex];
|
||||
}
|
||||
|
||||
// Check if all data exists now
|
||||
let thisExecutionData: INodeExecutionData[] | null;
|
||||
let allDataFound = true;
|
||||
for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) {
|
||||
thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i];
|
||||
if (thisExecutionData === null) {
|
||||
allDataFound = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (allDataFound === true) {
|
||||
// All data exists for node to be executed
|
||||
// So add it to the execution stack
|
||||
runExecutionData.executionData!.nodeExecutionStack.push({
|
||||
node: workflow.nodes[connectionData.node],
|
||||
data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]
|
||||
});
|
||||
|
||||
// Remove the data from waiting
|
||||
delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
|
||||
|
||||
if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) {
|
||||
// No more data left for the node so also delete that one
|
||||
delete runExecutionData.executionData!.waitingExecution[connectionData.node];
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
stillDataMissing = true;
|
||||
}
|
||||
|
||||
if (nodeWasWaiting === false) {
|
||||
|
||||
// Get a list of all the output nodes that we can check for siblings eaiser
|
||||
const checkOutputNodes = [];
|
||||
for (const outputIndexParent in workflow.connectionsBySourceNode[parentNodeName].main) {
|
||||
if (!workflow.connectionsBySourceNode[parentNodeName].main.hasOwnProperty(outputIndexParent)) {
|
||||
continue;
|
||||
}
|
||||
for (const connectionDataCheck of workflow.connectionsBySourceNode[parentNodeName].main[outputIndexParent]) {
|
||||
checkOutputNodes.push(connectionDataCheck.node);
|
||||
}
|
||||
}
|
||||
|
||||
// Node was not on "waitingExecution" so it is the first time it gets
|
||||
// checked. So we have to go through all the inputs and check if they
|
||||
// are already on the list to be processed.
|
||||
// If that is not the case add it.
|
||||
for (let inputIndex = 0; inputIndex < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; inputIndex++) {
|
||||
for (const inputData of workflow.connectionsByDestinationNode[connectionData.node]['main'][inputIndex]) {
|
||||
if (inputData.node === parentNodeName) {
|
||||
// Is the node we come from so its data will be available for sure
|
||||
continue;
|
||||
}
|
||||
|
||||
const executionStackNodes = runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name);
|
||||
|
||||
// Check if that node is also an output connection of the
|
||||
// previously processed one
|
||||
if (inputData.node !== parentNodeName && checkOutputNodes.includes(inputData.node)) {
|
||||
// So the parent node will be added anyway which
|
||||
// will then process this node next. So nothing to do.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if it is already in the execution stack
|
||||
if (executionStackNodes.includes(inputData.node)) {
|
||||
// Node is already on the list to be executed
|
||||
// so there is nothing to do
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if node got processed already
|
||||
if (runExecutionData.resultData.runData[inputData.node] !== undefined) {
|
||||
// Node got processed already so no need to add it
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if any of the parent nodes does not have any inputs. That
|
||||
// would mean that it has to get added to the list of nodes to process.
|
||||
const parentNodes = workflow.getParentNodes(inputData.node, 'main', -1);
|
||||
let nodeToAdd: string | undefined = inputData.node;
|
||||
parentNodes.push(inputData.node);
|
||||
parentNodes.reverse();
|
||||
|
||||
for (const parentNode of parentNodes) {
|
||||
// Check if that node is also an output connection of the
|
||||
// previously processed one
|
||||
if (inputData.node !== parentNode && checkOutputNodes.includes(parentNode)) {
|
||||
// So the parent node will be added anyway which
|
||||
// will then process this node next. So nothing to do.
|
||||
nodeToAdd = undefined;
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if it is already in the execution stack
|
||||
if (executionStackNodes.includes(parentNode)) {
|
||||
// Node is already on the list to be executed
|
||||
// so there is nothing to do
|
||||
nodeToAdd = undefined;
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if node got processed already
|
||||
if (runExecutionData.resultData.runData[parentNode] !== undefined) {
|
||||
// Node got processed already so we can use the
|
||||
// output data as input of this node
|
||||
break;
|
||||
}
|
||||
|
||||
nodeToAdd = parentNode;
|
||||
}
|
||||
|
||||
if (nodeToAdd === undefined) {
|
||||
// No node has to get added so process
|
||||
continue;
|
||||
}
|
||||
|
||||
if (workflow.connectionsByDestinationNode[nodeToAdd] === undefined) {
|
||||
// Add only node if it does not have any inputs becuase else it will
|
||||
// be added by its input node later anyway.
|
||||
runExecutionData.executionData!.nodeExecutionStack.push(
|
||||
{
|
||||
node: workflow.getNode(nodeToAdd) as INode,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the array has all the values
|
||||
const connectionDataArray: Array<INodeExecutionData[] | null> = [];
|
||||
for (let i: number = connectionData.index; i >= 0; i--) {
|
||||
connectionDataArray[i] = null;
|
||||
}
|
||||
|
||||
// Add the data of the current execution
|
||||
if (nodeSuccessData === null) {
|
||||
connectionDataArray[connectionData.index] = null;
|
||||
} else {
|
||||
connectionDataArray[connectionData.index] = nodeSuccessData[outputIndex];
|
||||
}
|
||||
|
||||
if (stillDataMissing === true) {
|
||||
// Additional data is needed to run node so add it to waiting
|
||||
if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
|
||||
}
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
|
||||
main: connectionDataArray
|
||||
};
|
||||
} else {
|
||||
// All data is there so add it directly to stack
|
||||
runExecutionData.executionData!.nodeExecutionStack.push({
|
||||
node: workflow.nodes[connectionData.node],
|
||||
data: {
|
||||
main: connectionDataArray
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Runs the given execution data.
|
||||
|
@ -353,8 +558,6 @@ export class WorkflowExecute {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO Has to check if node is disabled
|
||||
|
||||
// Clone input data that nodes can not mess up data of parallel nodes which receive the same data
|
||||
// TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned
|
||||
// is very slow so only do if needed
|
||||
|
@ -471,152 +674,26 @@ export class WorkflowExecute {
|
|||
if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) {
|
||||
let outputIndex: string, connectionData: IConnection;
|
||||
// Go over all the different
|
||||
|
||||
// Add the nodes to be executed
|
||||
for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) {
|
||||
if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Go through all the different outputs of this connection
|
||||
for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) {
|
||||
if (!workflow.nodes.hasOwnProperty(connectionData.node)) {
|
||||
return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`));
|
||||
}
|
||||
|
||||
let stillDataMissing = false;
|
||||
|
||||
// Check if node has multiple inputs as then we have to wait for all input data
|
||||
// to be present before we can add it to the node-execution-stack
|
||||
if (workflow.connectionsByDestinationNode[connectionData.node]['main'].length > 1) {
|
||||
// Node has multiple inputs
|
||||
|
||||
// Check if there is already data for the node
|
||||
if (runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node) && runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] !== undefined) {
|
||||
// There is already data for the node and the current run so
|
||||
// add the new data
|
||||
if (nodeSuccessData === null) {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null;
|
||||
} else {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex];
|
||||
}
|
||||
|
||||
// Check if all data exists now
|
||||
let thisExecutionData: INodeExecutionData[] | null;
|
||||
let allDataFound = true;
|
||||
for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) {
|
||||
thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i];
|
||||
if (thisExecutionData === null) {
|
||||
allDataFound = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (allDataFound === true) {
|
||||
// All data exists for node to be executed
|
||||
// So add it to the execution stack
|
||||
runExecutionData.executionData!.nodeExecutionStack.push({
|
||||
node: workflow.nodes[connectionData.node],
|
||||
data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]
|
||||
});
|
||||
|
||||
// Remove the data from waiting
|
||||
delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
|
||||
|
||||
if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) {
|
||||
// No more data left for the node so also delete that one
|
||||
delete runExecutionData.executionData!.waitingExecution[connectionData.node];
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
stillDataMissing = true;
|
||||
}
|
||||
} else {
|
||||
stillDataMissing = true;
|
||||
|
||||
// Node was not on "waitingExecution" so it is the first time it gets
|
||||
// checked. So we have to go through all the inputs and check if they
|
||||
// are already on the list to be processed.
|
||||
// If that is not the case add it.
|
||||
for (let inputIndex = 0; inputIndex < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; inputIndex++) {
|
||||
for (const inputData of workflow.connectionsByDestinationNode[connectionData.node]['main'][inputIndex]) {
|
||||
if (inputData.node === executionNode.name) {
|
||||
// Is the node we come from so its data is available for sure
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the most top nodes to know where to start to process from
|
||||
const inputStartNodes = workflow.getStartNodes(inputData.node);
|
||||
|
||||
for (const startNode of inputStartNodes) {
|
||||
// Check if the node has to be added to be processed
|
||||
|
||||
// Check if node got processed already
|
||||
if (runExecutionData.resultData.runData[startNode.name] !== undefined) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if it is already in the execution stack
|
||||
const executionStackNodes = runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name);
|
||||
if (executionStackNodes.includes(startNode.name)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add is currently missing so add it
|
||||
runExecutionData.executionData!.nodeExecutionStack.push(
|
||||
{
|
||||
node: startNode,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the array has all the values
|
||||
const connectionDataArray: Array<INodeExecutionData[] | null> = [];
|
||||
for (let i: number = connectionData.index; i >= 0; i--) {
|
||||
connectionDataArray[i] = null;
|
||||
}
|
||||
|
||||
// Add the data of the current execution
|
||||
if (nodeSuccessData === null) {
|
||||
connectionDataArray[connectionData.index] = null;
|
||||
} else {
|
||||
connectionDataArray[connectionData.index] = nodeSuccessData[outputIndex];
|
||||
}
|
||||
|
||||
if (stillDataMissing === true) {
|
||||
// Additional data is needed to run node so add it to waiting
|
||||
if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) {
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
|
||||
}
|
||||
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
|
||||
main: connectionDataArray
|
||||
};
|
||||
} else {
|
||||
// All data is there so add it directly to stack
|
||||
runExecutionData.executionData!.nodeExecutionStack.push({
|
||||
node: workflow.nodes[connectionData.node],
|
||||
data: {
|
||||
main: connectionDataArray
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
this.addNodeToBeExecuted(workflow, runExecutionData, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve();
|
||||
})()
|
||||
.then(async () => {
|
||||
|
|
Loading…
Reference in a new issue