From 3320436a6fdf8472b3843b9fe8d4de7af7f5ef5c Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Tue, 26 Nov 2024 13:32:39 +0100 Subject: [PATCH] fix: Retain execution data between partial executions (new flow) (#11828) --- packages/@n8n/api-types/src/push/execution.ts | 1 + .../src/workflow-execute-additional-data.ts | 5 +- packages/cli/src/workflow-runner.ts | 7 +- .../__tests__/helpers.ts | 3 +- packages/core/src/WorkflowExecute.ts | 2 +- packages/core/test/WorkflowExecute.test.ts | 2 +- .../src/composables/usePushConnection.test.ts | 62 ++++++++++++++- .../src/composables/usePushConnection.ts | 12 ++- .../src/composables/useRunWorkflow.test.ts | 75 ++++++++++++++++++- .../src/composables/useRunWorkflow.ts | 2 +- packages/workflow/src/Interfaces.ts | 2 +- 11 files changed, 159 insertions(+), 14 deletions(-) diff --git a/packages/@n8n/api-types/src/push/execution.ts b/packages/@n8n/api-types/src/push/execution.ts index 7a1c1377d9..320b3dc264 100644 --- a/packages/@n8n/api-types/src/push/execution.ts +++ b/packages/@n8n/api-types/src/push/execution.ts @@ -9,6 +9,7 @@ type ExecutionStarted = { workflowId: string; workflowName?: string; retryOf?: string; + flattedRunData: string; }; }; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 1d73ca379b..b7d966afa5 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -281,7 +281,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { }, ], workflowExecuteBefore: [ - async function (this: WorkflowHooks): Promise { + async function (this: WorkflowHooks, _workflow, data): Promise { const { pushRef, executionId } = this; const { id: workflowId, name: workflowName } = this.workflowData; logger.debug('Executing hook (hookFunctionsPush)', { @@ -302,6 +302,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { retryOf: this.retryOf, workflowId, workflowName, + flattedRunData: data?.resultData.runData + ? stringify(data.resultData.runData) + : stringify({}), }, pushRef, ); diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 38ead8896e..1dd646c1c1 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -137,7 +137,10 @@ export class WorkflowRunner { // Create a failed execution with the data for the node, save it and abort execution const runData = generateFailedExecutionFromError(data.executionMode, error, error.node); const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); - await workflowHooks.executeHookFunctions('workflowExecuteBefore', []); + await workflowHooks.executeHookFunctions('workflowExecuteBefore', [ + undefined, + data.executionData, + ]); await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); responsePromise?.reject(error); this.activeExecutions.finalizeExecution(executionId); @@ -401,7 +404,7 @@ export class WorkflowRunner { // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. - await hooks.executeHookFunctions('workflowExecuteBefore', []); + await hooks.executeHookFunctions('workflowExecuteBefore', [undefined, data.executionData]); } catch (error) { // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. diff --git a/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts b/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts index 72b1efa30c..6a6c8a88db 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts @@ -5,13 +5,14 @@ interface StubNode { name: string; parameters?: INodeParameters; disabled?: boolean; + type?: string; } export function createNodeData(stubData: StubNode): INode { return { name: stubData.name, parameters: stubData.parameters ?? {}, - type: 'test.set', + type: stubData.type ?? 'n8n-nodes-base.set', typeVersion: 1, id: 'uuid-1234', position: [100, 100], diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index c6e0316038..8eab139209 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -948,7 +948,7 @@ export class WorkflowExecute { const returnPromise = (async () => { try { if (!this.additionalData.restartExecutionId) { - await this.executeHook('workflowExecuteBefore', [workflow]); + await this.executeHook('workflowExecuteBefore', [workflow, this.runExecutionData]); } } catch (error) { const e = error as unknown as ExecutionBaseError; diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index ab69a7574b..0cec3fa116 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -206,7 +206,7 @@ describe('WorkflowExecute', () => { } }); - describe('WorkflowExecute, NodeExecutionOutput type test', () => { + test('WorkflowExecute, NodeExecutionOutput type test', () => { //TODO Add more tests here when execution hints are added to some node types const nodeExecutionOutput = new NodeExecutionOutput( [[{ json: { data: 123 } }]], diff --git a/packages/editor-ui/src/composables/usePushConnection.test.ts b/packages/editor-ui/src/composables/usePushConnection.test.ts index 888c36a45d..c2901249d6 100644 --- a/packages/editor-ui/src/composables/usePushConnection.test.ts +++ b/packages/editor-ui/src/composables/usePushConnection.test.ts @@ -2,7 +2,7 @@ import { stringify } from 'flatted'; import { useRouter } from 'vue-router'; import { createPinia, setActivePinia } from 'pinia'; import type { PushMessage, PushPayload } from '@n8n/api-types'; -import type { ITaskData, WorkflowOperationError } from 'n8n-workflow'; +import type { ITaskData, WorkflowOperationError, IRunData } from 'n8n-workflow'; import { usePushConnection } from '@/composables/usePushConnection'; import { usePushConnectionStore } from '@/stores/pushConnection.store'; @@ -10,6 +10,7 @@ import { useOrchestrationStore } from '@/stores/orchestration.store'; import { useUIStore } from '@/stores/ui.store'; import { useWorkflowsStore } from '@/stores/workflows.store'; import { useToast } from '@/composables/useToast'; +import type { IExecutionResponse } from '@/Interface'; vi.mock('vue-router', () => { return { @@ -162,7 +163,7 @@ describe('usePushConnection()', () => { expect(result).toBeTruthy(); expect(workflowsStore.workflowExecutionData).toBeDefined(); - expect(uiStore.isActionActive['workflowRunning']).toBeTruthy(); + expect(uiStore.isActionActive.workflowRunning).toBeTruthy(); expect(toast.showMessage).toHaveBeenCalledWith({ title: 'Workflow executed successfully', @@ -236,5 +237,62 @@ describe('usePushConnection()', () => { expect(pushConnection.retryTimeout).not.toBeNull(); }); }); + + describe('executionStarted', async () => { + it("enqueues messages if we don't have the active execution id yet", async () => { + uiStore.isActionActive.workflowRunning = true; + const event: PushMessage = { + type: 'executionStarted', + data: { + executionId: '1', + mode: 'manual', + startedAt: new Date(), + workflowId: '1', + flattedRunData: stringify({}), + }, + }; + + expect(pushConnection.retryTimeout.value).toBeNull(); + expect(pushConnection.pushMessageQueue.value.length).toBe(0); + + const result = await pushConnection.pushMessageReceived(event); + + expect(result).toBe(false); + expect(pushConnection.pushMessageQueue.value).toHaveLength(1); + expect(pushConnection.pushMessageQueue.value).toContainEqual({ + message: event, + retriesLeft: 5, + }); + expect(pushConnection.retryTimeout).not.toBeNull(); + }); + + it('overwrites the run data in the workflow store', async () => { + // ARRANGE + uiStore.isActionActive.workflowRunning = true; + const oldRunData: IRunData = { foo: [] }; + workflowsStore.workflowExecutionData = { + data: { resultData: { runData: oldRunData } }, + } as IExecutionResponse; + const newRunData: IRunData = { bar: [] }; + const event: PushMessage = { + type: 'executionStarted', + data: { + executionId: '1', + flattedRunData: stringify(newRunData), + mode: 'manual', + startedAt: new Date(), + workflowId: '1', + }, + }; + workflowsStore.activeExecutionId = event.data.executionId; + + // ACT + const result = await pushConnection.pushMessageReceived(event); + + // ASSERT + expect(result).toBe(true); + expect(workflowsStore.workflowExecutionData.data?.resultData.runData).toEqual(newRunData); + }); + }); }); }); diff --git a/packages/editor-ui/src/composables/usePushConnection.ts b/packages/editor-ui/src/composables/usePushConnection.ts index d5ecdefc71..ee511958e3 100644 --- a/packages/editor-ui/src/composables/usePushConnection.ts +++ b/packages/editor-ui/src/composables/usePushConnection.ts @@ -146,7 +146,11 @@ export function usePushConnection({ router }: { router: ReturnType ({ useWorkflowsStore: vi.fn().mockReturnValue({ @@ -29,6 +31,16 @@ vi.mock('@/stores/workflows.store', () => ({ }), })); +vi.mock('@vueuse/core', async () => { + // eslint-disable-next-line @typescript-eslint/consistent-type-imports + const originalModule = await vi.importActual('@vueuse/core'); + + return { + ...originalModule, // Keep all original exports + useLocalStorage: vi.fn().mockReturnValue({ value: undefined }), // Mock useLocalStorage + }; +}); + vi.mock('@/composables/useTelemetry', () => ({ useTelemetry: vi.fn().mockReturnValue({ track: vi.fn() }), })); @@ -99,6 +111,7 @@ describe('useRunWorkflow({ router })', () => { beforeEach(() => { uiStore.activeActions = []; + vi.clearAllMocks(); }); describe('runWorkflowApi()', () => { @@ -168,8 +181,6 @@ describe('useRunWorkflow({ router })', () => { it('should prevent execution and show error message when workflow is active with single webhook trigger', async () => { const pinia = createTestingPinia({ stubActions: false }); setActivePinia(pinia); - const router = useRouter(); - const workflowsStore = useWorkflowsStore(); const toast = useToast(); const i18n = useI18n(); const { runWorkflow } = useRunWorkflow({ router }); @@ -250,6 +261,66 @@ describe('useRunWorkflow({ router })', () => { const result = await runWorkflow({}); expect(result).toEqual(mockExecutionResponse); }); + + it('does not use the original run data if `PartialExecution.version` is set to 0', async () => { + // ARRANGE + const mockExecutionResponse = { executionId: '123' }; + const mockRunData = { nodeName: [] }; + const { runWorkflow } = useRunWorkflow({ router }); + + vi.mocked(useLocalStorage).mockReturnValueOnce(ref(0)); + vi.mocked(rootStore).pushConnectionActive = true; + vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse); + vi.mocked(workflowsStore).nodesIssuesExist = false; + vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({ + name: 'Test Workflow', + } as Workflow); + vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({ + id: 'workflowId', + nodes: [], + } as unknown as IWorkflowData); + vi.mocked(workflowsStore).getWorkflowRunData = mockRunData; + + // ACT + const result = await runWorkflow({}); + + // ASSERT + expect(result).toEqual(mockExecutionResponse); + expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalledTimes(1); + expect(vi.mocked(workflowsStore.setWorkflowExecutionData).mock.calls[0][0]).toMatchObject({ + data: { resultData: { runData: {} } }, + }); + }); + + it('retains the original run data if `PartialExecution.version` is set to 1', async () => { + // ARRANGE + const mockExecutionResponse = { executionId: '123' }; + const mockRunData = { nodeName: [] }; + const { runWorkflow } = useRunWorkflow({ router }); + + vi.mocked(useLocalStorage).mockReturnValueOnce(ref(1)); + vi.mocked(rootStore).pushConnectionActive = true; + vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse); + vi.mocked(workflowsStore).nodesIssuesExist = false; + vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({ + name: 'Test Workflow', + } as Workflow); + vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({ + id: 'workflowId', + nodes: [], + } as unknown as IWorkflowData); + vi.mocked(workflowsStore).getWorkflowRunData = mockRunData; + + // ACT + const result = await runWorkflow({}); + + // ASSERT + expect(result).toEqual(mockExecutionResponse); + expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalledTimes(1); + expect(vi.mocked(workflowsStore.setWorkflowExecutionData).mock.calls[0][0]).toMatchObject({ + data: { resultData: { runData: mockRunData } }, + }); + }); }); describe('consolidateRunDataAndStartNodes()', () => { diff --git a/packages/editor-ui/src/composables/useRunWorkflow.ts b/packages/editor-ui/src/composables/useRunWorkflow.ts index eaa91a6228..931f2df230 100644 --- a/packages/editor-ui/src/composables/useRunWorkflow.ts +++ b/packages/editor-ui/src/composables/useRunWorkflow.ts @@ -259,7 +259,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType; nodeExecuteBefore?: Array<(nodeName: string) => Promise>; workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise>; - workflowExecuteBefore?: Array<(workflow: Workflow, data: IRunExecutionData) => Promise>; + workflowExecuteBefore?: Array<(workflow?: Workflow, data?: IRunExecutionData) => Promise>; sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; }