From b8da4ff9edb0fbb0093c4c41fe11f8e67b696ca3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20G=C3=B3mez=20Morales?= Date: Thu, 28 Nov 2024 14:04:55 +0100 Subject: [PATCH] fix(editor): Implement dirty nodes for partial executions (#11739) Co-authored-by: Danny Martini --- packages/cli/src/workflow-runner.ts | 3 +- .../workflows/workflow-execution.service.ts | 9 ++- .../cli/src/workflows/workflow.request.ts | 1 + .../__tests__/findStartNodes.test.ts | 61 +++++++++++++++-- .../PartialExecutionUtils/findStartNodes.ts | 8 +-- packages/core/src/WorkflowExecute.ts | 7 +- packages/core/test/WorkflowExecute.test.ts | 52 ++++++++++++++- packages/core/test/helpers/constants.ts | 5 ++ packages/editor-ui/src/Interface.ts | 1 + .../src/composables/useRunWorkflow.test.ts | 65 ++++++++++++++++++- .../src/composables/useRunWorkflow.ts | 26 ++++++++ .../src/stores/workflows.store.test.ts | 45 +++++++++++++ .../editor-ui/src/stores/workflows.store.ts | 4 ++ packages/workflow/src/Interfaces.ts | 1 + 14 files changed, 270 insertions(+), 18 deletions(-) diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 1dd646c1c1..6cade903d0 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -317,8 +317,9 @@ export class WorkflowRunner { workflowExecution = workflowExecute.runPartialWorkflow2( workflow, data.runData, - data.destinationNode, data.pinData, + data.dirtyNodeNames, + data.destinationNode, ); } else { workflowExecution = workflowExecute.runPartialWorkflow( diff --git a/packages/cli/src/workflows/workflow-execution.service.ts b/packages/cli/src/workflows/workflow-execution.service.ts index dd8480d759..d6ef8b4f93 100644 --- a/packages/cli/src/workflows/workflow-execution.service.ts +++ b/packages/cli/src/workflows/workflow-execution.service.ts @@ -89,7 +89,13 @@ export class WorkflowExecutionService { } async executeManually( - { workflowData, runData, startNodes, destinationNode }: WorkflowRequest.ManualRunPayload, + { + workflowData, + runData, + startNodes, + destinationNode, + dirtyNodeNames, + }: WorkflowRequest.ManualRunPayload, user: User, pushRef?: string, partialExecutionVersion?: string, @@ -137,6 +143,7 @@ export class WorkflowExecutionService { workflowData, userId: user.id, partialExecutionVersion: partialExecutionVersion ?? '0', + dirtyNodeNames, }; const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; diff --git a/packages/cli/src/workflows/workflow.request.ts b/packages/cli/src/workflows/workflow.request.ts index d45cfd14d3..abfb58026a 100644 --- a/packages/cli/src/workflows/workflow.request.ts +++ b/packages/cli/src/workflows/workflow.request.ts @@ -22,6 +22,7 @@ export declare namespace WorkflowRequest { runData: IRunData; startNodes?: StartNodeData[]; destinationNode?: string; + dirtyNodeNames?: string[]; }; type Create = AuthenticatedRequest<{}, {}, CreateUpdatePayload>; diff --git a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts index ab33ccf8ed..8dee8dff1b 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts @@ -46,7 +46,13 @@ describe('findStartNodes', () => { const node = createNodeData({ name: 'Basic Node' }); const graph = new DirectedGraph().addNode(node); - const startNodes = findStartNodes({ graph, trigger: node, destination: node }); + const startNodes = findStartNodes({ + graph, + trigger: node, + destination: node, + pinData: {}, + runData: {}, + }); expect(startNodes.size).toBe(1); expect(startNodes).toContainEqual(node); @@ -65,7 +71,13 @@ describe('findStartNodes', () => { // if the trigger has no run data { - const startNodes = findStartNodes({ graph, trigger, destination }); + const startNodes = findStartNodes({ + graph, + trigger, + destination, + pinData: {}, + runData: {}, + }); expect(startNodes.size).toBe(1); expect(startNodes).toContainEqual(trigger); @@ -77,7 +89,13 @@ describe('findStartNodes', () => { [trigger.name]: [toITaskData([{ data: { value: 1 } }])], }; - const startNodes = findStartNodes({ graph, trigger, destination, runData }); + const startNodes = findStartNodes({ + graph, + trigger, + destination, + runData, + pinData: {}, + }); expect(startNodes.size).toBe(1); expect(startNodes).toContainEqual(destination); @@ -112,7 +130,13 @@ describe('findStartNodes', () => { }; // ACT - const startNodes = findStartNodes({ graph, trigger, destination: node, runData }); + const startNodes = findStartNodes({ + graph, + trigger, + destination: node, + runData, + pinData: {}, + }); // ASSERT expect(startNodes.size).toBe(1); @@ -153,7 +177,13 @@ describe('findStartNodes', () => { { // ACT - const startNodes = findStartNodes({ graph, trigger, destination: node4 }); + const startNodes = findStartNodes({ + graph, + trigger, + destination: node4, + pinData: {}, + runData: {}, + }); // ASSERT expect(startNodes.size).toBe(1); @@ -172,7 +202,13 @@ describe('findStartNodes', () => { }; // ACT - const startNodes = findStartNodes({ graph, trigger, destination: node4, runData }); + const startNodes = findStartNodes({ + graph, + trigger, + destination: node4, + runData, + pinData: {}, + }); // ASSERT expect(startNodes.size).toBe(1); @@ -208,6 +244,7 @@ describe('findStartNodes', () => { runData: { [trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], }, + pinData: {}, }); // ASSERT @@ -243,6 +280,7 @@ describe('findStartNodes', () => { runData: { [trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 1 }])], }, + pinData: {}, }); // ASSERT @@ -283,6 +321,7 @@ describe('findStartNodes', () => { ]), ], }, + pinData: {}, }); // ASSERT @@ -321,6 +360,7 @@ describe('findStartNodes', () => { [node1.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], [node2.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], }, + pinData: {}, }); // ASSERT @@ -357,6 +397,7 @@ describe('findStartNodes', () => { [trigger.name]: [toITaskData([{ data: { value: 1 } }])], [node1.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 1 }])], }, + pinData: {}, }); // ASSERT @@ -389,7 +430,13 @@ describe('findStartNodes', () => { const pinData: IPinData = {}; // ACT - const startNodes = findStartNodes({ graph, trigger, destination: node2, runData, pinData }); + const startNodes = findStartNodes({ + graph, + trigger, + destination: node2, + runData, + pinData, + }); // ASSERT expect(startNodes.size).toBe(1); diff --git a/packages/core/src/PartialExecutionUtils/findStartNodes.ts b/packages/core/src/PartialExecutionUtils/findStartNodes.ts index 5eb036bd88..b3f4f95399 100644 --- a/packages/core/src/PartialExecutionUtils/findStartNodes.ts +++ b/packages/core/src/PartialExecutionUtils/findStartNodes.ts @@ -135,14 +135,14 @@ export function findStartNodes(options: { graph: DirectedGraph; trigger: INode; destination: INode; - runData?: IRunData; - pinData?: IPinData; + pinData: IPinData; + runData: IRunData; }): Set { const graph = options.graph; const trigger = options.trigger; const destination = options.destination; - const runData = options.runData ?? {}; - const pinData = options.pinData ?? {}; + const runData = { ...options.runData }; + const pinData = options.pinData; const startNodes = findStartNodesRecursive( graph, diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 8eab139209..07819a7102 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -4,6 +4,7 @@ /* eslint-disable @typescript-eslint/prefer-nullish-coalescing */ import * as assert from 'assert/strict'; import { setMaxListeners } from 'events'; +import { omit } from 'lodash'; import get from 'lodash/get'; import type { ExecutionBaseError, @@ -319,8 +320,9 @@ export class WorkflowExecute { runPartialWorkflow2( workflow: Workflow, runData: IRunData, + pinData: IPinData = {}, + dirtyNodeNames: string[] = [], destinationNodeName?: string, - pinData?: IPinData, ): PCancelable { // TODO: Refactor the call-site to make `destinationNodeName` a required // after removing the old partial execution flow. @@ -349,7 +351,8 @@ export class WorkflowExecute { const filteredNodes = subgraph.getNodes(); // 3. Find the Start Nodes - let startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData }); + runData = omit(runData, dirtyNodeNames); + let startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData, pinData }); // 4. Detect Cycles // 5. Handle Cycles diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index 0cec3fa116..3c9b5cd96f 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -1,4 +1,4 @@ -import type { IRun, WorkflowTestData } from 'n8n-workflow'; +import type { IPinData, IRun, IRunData, WorkflowTestData } from 'n8n-workflow'; import { ApplicationError, createDeferredPromise, @@ -6,17 +6,20 @@ import { Workflow, } from 'n8n-workflow'; +import { DirectedGraph } from '@/PartialExecutionUtils'; +import { createNodeData, toITaskData } from '@/PartialExecutionUtils/__tests__/helpers'; import { WorkflowExecute } from '@/WorkflowExecute'; import * as Helpers from './helpers'; import { legacyWorkflowExecuteTests, v1WorkflowExecuteTests } from './helpers/constants'; +const nodeTypes = Helpers.NodeTypes(); + describe('WorkflowExecute', () => { describe('v0 execution order', () => { const tests: WorkflowTestData[] = legacyWorkflowExecuteTests; const executionMode = 'manual'; - const nodeTypes = Helpers.NodeTypes(); for (const testData of tests) { test(testData.description, async () => { @@ -217,4 +220,49 @@ describe('WorkflowExecute', () => { expect(nodeExecutionOutput[0][0].json.data).toEqual(123); expect(nodeExecutionOutput.getHints()[0].message).toEqual('TEXT HINT'); }); + + describe('runPartialWorkflow2', () => { + // Dirty ► + // ┌───────┐1 ┌─────┐1 ┌─────┐ + // │trigger├──────►node1├──────►node2│ + // └───────┘ └─────┘ └─────┘ + test("deletes dirty nodes' run data", async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const nodeExecutionOrder: string[] = []; + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder); + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const workflow = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 }) + .toWorkflow({ name: '', active: false, nodeTypes }); + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { name: trigger.name } }])], + [node1.name]: [toITaskData([{ data: { name: node1.name } }])], + [node2.name]: [toITaskData([{ data: { name: node2.name } }])], + }; + const dirtyNodeNames = [node1.name]; + + jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn()); + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + 'node2', + ); + + // ASSERT + const fullRunData = workflowExecute.getFullRunData(new Date()); + expect(fullRunData.data.resultData.runData).toHaveProperty(trigger.name); + expect(fullRunData.data.resultData.runData).not.toHaveProperty(node1.name); + }); + }); }); diff --git a/packages/core/test/helpers/constants.ts b/packages/core/test/helpers/constants.ts index f3de1c667c..3043f455e9 100644 --- a/packages/core/test/helpers/constants.ts +++ b/packages/core/test/helpers/constants.ts @@ -7,6 +7,7 @@ import type { import { NodeConnectionType } from 'n8n-workflow'; import { If } from '../../../nodes-base/dist/nodes/If/If.node'; +import { ManualTrigger } from '../../../nodes-base/dist/nodes/ManualTrigger/ManualTrigger.node'; import { Merge } from '../../../nodes-base/dist/nodes/Merge/Merge.node'; import { NoOp } from '../../../nodes-base/dist/nodes/NoOp/NoOp.node'; import { Set } from '../../../nodes-base/dist/nodes/Set/Set.node'; @@ -33,6 +34,10 @@ export const predefinedNodesTypes: INodeTypeData = { type: new Start(), sourcePath: '', }, + 'n8n-nodes-base.manualTrigger': { + type: new ManualTrigger(), + sourcePath: '', + }, 'n8n-nodes-base.versionTest': { sourcePath: '', type: { diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index 4d12d6b1df..20b7079b6f 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -200,6 +200,7 @@ export interface IStartRunData { startNodes?: StartNodeData[]; destinationNode?: string; runData?: IRunData; + dirtyNodeNames?: string[]; } export interface ITableData { diff --git a/packages/editor-ui/src/composables/useRunWorkflow.test.ts b/packages/editor-ui/src/composables/useRunWorkflow.test.ts index 4b50ebea4b..4a09e2e943 100644 --- a/packages/editor-ui/src/composables/useRunWorkflow.test.ts +++ b/packages/editor-ui/src/composables/useRunWorkflow.test.ts @@ -2,7 +2,13 @@ import { setActivePinia } from 'pinia'; import { createTestingPinia } from '@pinia/testing'; import { useRouter } from 'vue-router'; import type router from 'vue-router'; -import { ExpressionError, type IPinData, type IRunData, type Workflow } from 'n8n-workflow'; +import { + ExpressionError, + type IPinData, + type IRunData, + type Workflow, + type IExecuteData, +} from 'n8n-workflow'; import { useRootStore } from '@/stores/root.store'; import { useRunWorkflow } from '@/composables/useRunWorkflow'; @@ -28,6 +34,8 @@ vi.mock('@/stores/workflows.store', () => ({ getNodeByName: vi.fn(), getExecution: vi.fn(), nodeIssuesExit: vi.fn(), + checkIfNodeHasChatParent: vi.fn(), + getParametersLastUpdate: vi.fn(), }), })); @@ -69,6 +77,7 @@ vi.mock('@/composables/useWorkflowHelpers', () => ({ saveCurrentWorkflow: vi.fn(), getWorkflowDataToSave: vi.fn(), setDocumentTitle: vi.fn(), + executeData: vi.fn(), }), })); @@ -262,6 +271,60 @@ describe('useRunWorkflow({ router })', () => { expect(result).toEqual(mockExecutionResponse); }); + it('should send dirty nodes for partial executions', async () => { + vi.mocked(useLocalStorage).mockReturnValueOnce(ref(1)); + const composable = useRunWorkflow({ router }); + const parentName = 'When clicking'; + const executeName = 'Code'; + vi.mocked(workflowsStore).getWorkflowRunData = { + [parentName]: [ + { + startTime: 1, + executionTime: 0, + source: [], + }, + ], + [executeName]: [ + { + startTime: 1, + executionTime: 8, + source: [ + { + previousNode: parentName, + }, + ], + }, + ], + }; + vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({ + name: 'Test Workflow', + getParentNodes: () => [parentName], + nodes: { [parentName]: {} }, + } as unknown as Workflow); + vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({ + nodes: [], + } as unknown as IWorkflowData); + vi.mocked(workflowHelpers).executeData.mockResolvedValue({ + data: {}, + node: {}, + source: null, + } as IExecuteData); + + vi.mocked(workflowsStore).checkIfNodeHasChatParent.mockReturnValue(false); + vi.mocked(workflowsStore).getParametersLastUpdate.mockImplementation((name: string) => { + if (name === executeName) return 2; + return undefined; + }); + + const { runWorkflow } = composable; + + await runWorkflow({ destinationNode: 'Code 1', source: 'Node.executeNode' }); + + expect(workflowsStore.runWorkflow).toHaveBeenCalledWith( + expect.objectContaining({ dirtyNodeNames: [executeName] }), + ); + }); + it('does not use the original run data if `PartialExecution.version` is set to 0', async () => { // ARRANGE const mockExecutionResponse = { executionId: '123' }; diff --git a/packages/editor-ui/src/composables/useRunWorkflow.ts b/packages/editor-ui/src/composables/useRunWorkflow.ts index 931f2df230..6b7dc9fda5 100644 --- a/packages/editor-ui/src/composables/useRunWorkflow.ts +++ b/packages/editor-ui/src/composables/useRunWorkflow.ts @@ -37,6 +37,25 @@ import { get } from 'lodash-es'; import { useExecutionsStore } from '@/stores/executions.store'; import { useLocalStorage } from '@vueuse/core'; +const getDirtyNodeNames = ( + runData: IRunData, + getParametersLastUpdate: (nodeName: string) => number | undefined, +): string[] | undefined => { + const dirtyNodeNames = Object.entries(runData).reduce((acc, [nodeName, tasks]) => { + if (!tasks.length) return acc; + + const updatedAt = getParametersLastUpdate(nodeName) ?? 0; + + if (updatedAt > tasks[0].startTime) { + acc.push(nodeName); + } + + return acc; + }, []); + + return dirtyNodeNames.length ? dirtyNodeNames : undefined; +}; + export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType }) { const nodeHelpers = useNodeHelpers(); const workflowHelpers = useWorkflowHelpers({ router: useRunWorkflowOpts.router }); @@ -244,6 +263,13 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType { }); }); }); + + describe('setNodeValue()', () => { + it('should update a node', () => { + const nodeName = 'Edit Fields'; + workflowsStore.addNode({ + parameters: {}, + id: '554c7ff4-7ee2-407c-8931-e34234c5056a', + name: nodeName, + type: 'n8n-nodes-base.set', + position: [680, 180], + typeVersion: 3.4, + }); + + expect(workflowsStore.nodeMetadata[nodeName].parametersLastUpdatedAt).toBe(undefined); + + workflowsStore.setNodeValue({ name: 'Edit Fields', key: 'executeOnce', value: true }); + + expect(workflowsStore.workflow.nodes[0].executeOnce).toBe(true); + expect(workflowsStore.nodeMetadata[nodeName].parametersLastUpdatedAt).toEqual( + expect.any(Number), + ); + }); + }); + + describe('setNodePositionById()', () => { + it('should NOT update parametersLastUpdatedAt', () => { + const nodeName = 'Edit Fields'; + const nodeId = '554c7ff4-7ee2-407c-8931-e34234c5056a'; + workflowsStore.addNode({ + parameters: {}, + id: nodeId, + name: nodeName, + type: 'n8n-nodes-base.set', + position: [680, 180], + typeVersion: 3.4, + }); + + expect(workflowsStore.nodeMetadata[nodeName].parametersLastUpdatedAt).toBe(undefined); + + workflowsStore.setNodePositionById(nodeId, [0, 0]); + + expect(workflowsStore.workflow.nodes[0].position).toStrictEqual([0, 0]); + expect(workflowsStore.nodeMetadata[nodeName].parametersLastUpdatedAt).toBe(undefined); + }); + }); }); function getMockEditFieldsNode() { diff --git a/packages/editor-ui/src/stores/workflows.store.ts b/packages/editor-ui/src/stores/workflows.store.ts index 58452eaf34..0eb235dea2 100644 --- a/packages/editor-ui/src/stores/workflows.store.ts +++ b/packages/editor-ui/src/stores/workflows.store.ts @@ -1200,6 +1200,10 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { updateNodeAtIndex(nodeIndex, { [updateInformation.key]: updateInformation.value, }); + + if (updateInformation.key !== 'position') { + nodeMetadata.value[workflow.value.nodes[nodeIndex].name].parametersLastUpdatedAt = Date.now(); + } } function setNodeParameters(updateInformation: IUpdateInformation, append?: boolean): void { diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index e766a4aa21..0a7a5d0379 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -2274,6 +2274,7 @@ export interface IWorkflowExecutionDataProcess { * PARTIAL_EXECUTION_VERSION_DEFAULT */ partialExecutionVersion?: string; + dirtyNodeNames?: string[]; } export interface ExecuteWorkflowOptions {