feat(core): Support executing single nodes not part of a graph as a partial execution (#13529)

This commit is contained in:
Danny Martini 2025-02-27 09:35:52 +01:00 committed by GitHub
parent 223ec2d9c9
commit 8a34f027c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 135 additions and 15 deletions

View file

@ -52,6 +52,10 @@ import { WorkflowExecute } from '../workflow-execute';
const nodeTypes = Helpers.NodeTypes(); const nodeTypes = Helpers.NodeTypes();
beforeEach(() => {
jest.resetAllMocks();
});
describe('WorkflowExecute', () => { describe('WorkflowExecute', () => {
describe('v0 execution order', () => { describe('v0 execution order', () => {
const tests: WorkflowTestData[] = legacyWorkflowExecuteTests; const tests: WorkflowTestData[] = legacyWorkflowExecuteTests;
@ -455,6 +459,56 @@ describe('WorkflowExecute', () => {
new Set([node1]), new Set([node1]),
); );
}); });
// ►►
// ┌──────┐
// │orphan│
// └──────┘
// ┌───────┐ ┌───────────┐
// │trigger├────►│destination│
// └───────┘ └───────────┘
test('works with a single node', async () => {
// ARRANGE
const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
const trigger = createNodeData({ name: 'trigger' });
const destination = createNodeData({ name: 'destination' });
const orphan = createNodeData({ name: 'orphan' });
const workflow = new DirectedGraph()
.addNodes(trigger, destination, orphan)
.addConnections({ from: trigger, to: destination })
.toWorkflow({ name: '', active: false, nodeTypes });
const pinData: IPinData = {};
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[destination.name]: [toITaskData([{ data: { nodeName: destination.name } }])],
};
const dirtyNodeNames: string[] = [];
const processRunExecutionDataSpy = jest
.spyOn(workflowExecute, 'processRunExecutionData')
.mockImplementationOnce(jest.fn());
// ACT
await workflowExecute.runPartialWorkflow2(
workflow,
runData,
pinData,
dirtyNodeNames,
orphan.name,
);
// ASSERT
expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1);
expect(processRunExecutionDataSpy).toHaveBeenCalledWith(
new DirectedGraph().addNode(orphan).toWorkflow({ ...workflow }),
);
});
}); });
describe('checkReadyForExecution', () => { describe('checkReadyForExecution', () => {
@ -465,19 +519,20 @@ describe('WorkflowExecute', () => {
const nodeParamIssuesSpy = jest.spyOn(NodeHelpers, 'getNodeParametersIssues'); const nodeParamIssuesSpy = jest.spyOn(NodeHelpers, 'getNodeParametersIssues');
const nodeTypes = mock<INodeTypes>(); const nodeTypes = mock<INodeTypes>();
nodeTypes.getByNameAndVersion.mockImplementation((type) => {
// TODO: getByNameAndVersion signature needs to be updated to allow returning undefined beforeEach(() => {
if (type === 'unknownNode') return undefined as unknown as INodeType; nodeTypes.getByNameAndVersion.mockImplementation((type) => {
return mock<INodeType>({ // TODO: getByNameAndVersion signature needs to be updated to allow returning undefined
description: { if (type === 'unknownNode') return undefined as unknown as INodeType;
properties: [], return mock<INodeType>({
}, description: {
properties: [],
},
});
}); });
}); });
const workflowExecute = new WorkflowExecute(mock(), 'manual'); const workflowExecute = new WorkflowExecute(mock(), 'manual');
beforeEach(() => jest.clearAllMocks());
it('should return null if there are no nodes', () => { it('should return null if there are no nodes', () => {
const workflow = new Workflow({ const workflow = new Workflow({
nodes: [], nodes: [],
@ -562,7 +617,9 @@ describe('WorkflowExecute', () => {
}, },
}); });
nodeTypes.getByNameAndVersion.mockReturnValue(triggerNodeType); beforeEach(() => {
nodeTypes.getByNameAndVersion.mockReturnValue(triggerNodeType);
});
const workflow = new Workflow({ const workflow = new Workflow({
nodeTypes, nodeTypes,

View file

@ -33,6 +33,27 @@ describe('findSubgraph', () => {
expect(subgraph).toEqual(graph); expect(subgraph).toEqual(graph);
}); });
// ►►
// ┌──────┐
// │orphan│
// └──────┘
// ┌───────┐ ┌───────────┐
// │trigger├────►│destination│
// └───────┘ └───────────┘
test('works with a single node', () => {
const trigger = createNodeData({ name: 'trigger' });
const destination = createNodeData({ name: 'destination' });
const orphan = createNodeData({ name: 'orphan' });
const graph = new DirectedGraph()
.addNodes(trigger, destination, orphan)
.addConnections({ from: trigger, to: destination });
const subgraph = findSubgraph({ graph, destination: orphan, trigger: orphan });
expect(subgraph).toEqual(new DirectedGraph().addNode(orphan));
});
// ►► // ►►
// ┌───────┐ ┌───────────┐ // ┌───────┐ ┌───────────┐
// │ ├────────►│ │ // │ ├────────►│ │

View file

@ -13,6 +13,12 @@ function findSubgraphRecursive(
) { ) {
// If the current node is the chosen trigger keep this branch. // If the current node is the chosen trigger keep this branch.
if (current === trigger) { if (current === trigger) {
// If this graph consists of only one node there won't be any connections
// and the loop below won't add anything.
// We're adding the trigger here so that graphs with one node and no
// connections are handled correctly.
newGraph.addNode(trigger);
for (const connection of currentBranch) { for (const connection of currentBranch) {
newGraph.addNodes(connection.from, connection.to); newGraph.addNodes(connection.from, connection.to);
newGraph.addConnection(connection); newGraph.addConnection(connection);

View file

@ -354,16 +354,52 @@ export class WorkflowExecute {
`Could not find a node with the name ${destinationNodeName} in the workflow.`, `Could not find a node with the name ${destinationNodeName} in the workflow.`,
); );
let graph = DirectedGraph.fromWorkflow(workflow);
// Edge Case 1:
// Support executing a single node that is not connected to a trigger
const destinationHasNoParents = graph.getDirectParentConnections(destination).length === 0;
if (destinationHasNoParents) {
// short cut here, only create a subgraph and the stacks
graph = findSubgraph({
graph: filterDisabledNodes(graph),
destination,
trigger: destination,
});
const filteredNodes = graph.getNodes();
runData = cleanRunData(runData, graph, new Set([destination]));
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {});
this.status = 'running';
this.runExecutionData = {
startData: {
destinationNode: destinationNodeName,
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
},
resultData: {
runData,
pinData,
},
executionData: {
contextData: {},
nodeExecutionStack,
metadata: {},
waitingExecution,
waitingExecutionSource,
},
};
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
}
// 1. Find the Trigger // 1. Find the Trigger
const trigger = findTriggerForPartialExecution(workflow, destinationNodeName); const trigger = findTriggerForPartialExecution(workflow, destinationNodeName);
if (trigger === undefined) { if (trigger === undefined) {
throw new ApplicationError( throw new ApplicationError('Connect a trigger to run this node');
'The destination node is not connected to any trigger. Partial executions need a trigger.',
);
} }
// 2. Find the Subgraph // 2. Find the Subgraph
let graph = DirectedGraph.fromWorkflow(workflow);
graph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger }); graph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger });
const filteredNodes = graph.getNodes(); const filteredNodes = graph.getNodes();
@ -380,7 +416,7 @@ export class WorkflowExecute {
// 7. Recreate Execution Stack // 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, new Set(startNodes), runData, pinData ?? {}); recreateNodeExecutionStack(graph, startNodes, runData, pinData ?? {});
// 8. Execute // 8. Execute
this.status = 'running'; this.status = 'running';