From d41ca832dc6e96440004cc69366ff4e034b5eed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Fri, 7 Feb 2025 18:16:37 +0100 Subject: [PATCH] refactor(core): Move ExecutionLifecycleHooks to core (#13042) --- .../cli/src/__tests__/workflow-runner.test.ts | 46 +- .../execution-lifecycle-hooks.test.ts | 342 ++++---- .../execution-lifecycle-hooks.ts | 796 ++++++++---------- .../executions/execution-recovery.service.ts | 6 +- packages/cli/src/scaling/job-processor.ts | 27 +- .../src/workflow-execute-additional-data.ts | 4 +- packages/cli/src/workflow-runner.ts | 81 +- .../__tests__/node-execute-functions.test.ts | 23 +- .../execution-lifecycle-hooks.test.ts | 113 +++ .../__tests__/triggers-and-pollers.test.ts | 20 +- .../__tests__/workflow-execute.test.ts | 11 +- .../execution-lifecycle-hooks.ts | 119 +++ packages/core/src/execution-engine/index.ts | 1 + .../node-execution-context/execute-context.ts | 2 +- .../supply-data-context.ts | 4 +- .../execution-engine/triggers-and-pollers.ts | 61 +- .../src/execution-engine/workflow-execute.ts | 39 +- packages/core/src/node-execute-functions.ts | 2 +- packages/core/test/helpers/index.ts | 24 +- packages/nodes-base/test/nodes/Helpers.ts | 23 +- .../nodes-base/test/nodes/TriggerHelpers.ts | 6 +- packages/workflow/src/Interfaces.ts | 13 - packages/workflow/src/WorkflowHooks.ts | 33 - packages/workflow/src/index.ts | 1 - 24 files changed, 911 insertions(+), 886 deletions(-) create mode 100644 packages/core/src/execution-engine/__tests__/execution-lifecycle-hooks.test.ts create mode 100644 packages/core/src/execution-engine/execution-lifecycle-hooks.ts delete mode 100644 packages/workflow/src/WorkflowHooks.ts diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 7330f85997..1de4ddef6b 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -12,17 +12,14 @@ import type { IWorkflowExecutionDataProcess, StartNodeData, } from 'n8n-workflow'; -import { - Workflow, - WorkflowHooks, - type ExecutionError, - type IWorkflowExecuteHooks, -} from 'n8n-workflow'; +import { Workflow, type ExecutionError } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; +import type { ExecutionEntity } from '@/databases/entities/execution-entity'; import type { User } from '@/databases/entities/user'; +import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { Telemetry } from '@/telemetry'; import { PermissionChecker } from '@/user-management/permission-checker'; @@ -36,25 +33,14 @@ import { setupTestServer } from '@test-integration/utils'; let owner: User; let runner: WorkflowRunner; -let hookFunctions: IWorkflowExecuteHooks; setupTestServer({ endpointGroups: [] }); mockInstance(Telemetry); -class Watchers { - workflowExecuteAfter = jest.fn(); -} -const watchers = new Watchers(); -const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter'); - beforeAll(async () => { owner = await createUser({ role: 'global:owner' }); runner = Container.get(WorkflowRunner); - - hookFunctions = { - workflowExecuteAfter: [watchers.workflowExecuteAfter], - }; }); afterAll(() => { @@ -67,6 +53,20 @@ beforeEach(async () => { }); describe('processError', () => { + let workflow: WorkflowEntity; + let execution: ExecutionEntity; + let hooks: core.ExecutionLifecycleHooks; + + const watcher = mock<{ workflowExecuteAfter: () => Promise }>(); + + beforeEach(async () => { + jest.clearAllMocks(); + workflow = await createWorkflow({}, owner); + execution = await createExecution({ status: 'success', finished: true }, workflow); + hooks = new core.ExecutionLifecycleHooks('webhook', execution.id, workflow); + hooks.addHandler('workflowExecuteAfter', watcher.workflowExecuteAfter); + }); + test('processError should return early in Bull stalled edge case', async () => { const workflow = await createWorkflow({}, owner); const execution = await createExecution( @@ -82,9 +82,9 @@ describe('processError', () => { new Date(), 'webhook', execution.id, - new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + hooks, ); - expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0); + expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0); }); test('processError should return early if the error is `ExecutionNotFoundError`', async () => { @@ -95,9 +95,9 @@ describe('processError', () => { new Date(), 'webhook', execution.id, - new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + hooks, ); - expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0); + expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0); }); test('processError should process error', async () => { @@ -119,9 +119,9 @@ describe('processError', () => { new Date(), 'webhook', execution.id, - new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + hooks, ); - expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1); + expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts index 4d22727962..747cd775c0 100644 --- a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts @@ -1,7 +1,13 @@ import { stringify } from 'flatted'; import { mock } from 'jest-mock-extended'; -import { BinaryDataService, ErrorReporter, InstanceSettings, Logger } from 'n8n-core'; -import { ExpressionError, WorkflowHooks } from 'n8n-workflow'; +import { + BinaryDataService, + ErrorReporter, + InstanceSettings, + Logger, + ExecutionLifecycleHooks, +} from 'n8n-core'; +import { ExpressionError } from 'n8n-workflow'; import type { IRunExecutionData, ITaskData, @@ -10,6 +16,7 @@ import type { IRun, INode, IWorkflowBase, + WorkflowExecuteMode, } from 'n8n-workflow'; import config from '@/config'; @@ -25,10 +32,10 @@ import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.serv import { mockInstance } from '@test/mocking'; import { - getWorkflowHooksIntegrated, - getWorkflowHooksMain, - getWorkflowHooksWorkerExecuter, - getWorkflowHooksWorkerMain, + getLifecycleHooksForSubExecutions, + getLifecycleHooksForRegularMain, + getLifecycleHooksForScalingWorker, + getLifecycleHooksForScalingMain, } from '../execution-lifecycle-hooks'; describe('Execution Lifecycle Hooks', () => { @@ -79,14 +86,13 @@ describe('Execution Lifecycle Hooks', () => { waitTill: new Date(), }); const expressionError = new ExpressionError('Error'); - const executionMode = 'manual'; const pushRef = 'test-push-ref'; const retryOf = 'test-retry-of'; const now = new Date('2025-01-13T18:25:50.267Z'); jest.useFakeTimers({ now }); - let hooks: WorkflowHooks; + let lifecycleHooks: ExecutionLifecycleHooks; beforeEach(() => { jest.clearAllMocks(); @@ -107,7 +113,7 @@ describe('Execution Lifecycle Hooks', () => { const workflowEventTests = () => { describe('workflowExecuteBefore', () => { it('should emit workflow-pre-execute events', async () => { - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', { executionId, @@ -118,7 +124,7 @@ describe('Execution Lifecycle Hooks', () => { describe('workflowExecuteAfter', () => { it('should emit workflow-post-execute events', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(eventService.emit).toHaveBeenCalledWith('workflow-post-execute', { executionId, @@ -128,7 +134,7 @@ describe('Execution Lifecycle Hooks', () => { }); it('should not emit workflow-post-execute events for waiting executions', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [waitingRun, {}]); expect(eventService.emit).not.toHaveBeenCalledWith('workflow-post-execute'); }); @@ -138,7 +144,7 @@ describe('Execution Lifecycle Hooks', () => { const nodeEventsTests = () => { describe('nodeExecuteBefore', () => { it('should emit node-pre-execute event', async () => { - await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); + await lifecycleHooks.runHook('nodeExecuteBefore', [nodeName]); expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', { executionId, @@ -150,11 +156,7 @@ describe('Execution Lifecycle Hooks', () => { describe('nodeExecuteAfter', () => { it('should emit node-post-execute event', async () => { - await hooks.executeHookFunctions('nodeExecuteAfter', [ - nodeName, - taskData, - runExecutionData, - ]); + await lifecycleHooks.runHook('nodeExecuteAfter', [nodeName, taskData, runExecutionData]); expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', { executionId, @@ -168,18 +170,15 @@ describe('Execution Lifecycle Hooks', () => { const externalHooksTests = () => { describe('workflowExecuteBefore', () => { it('should run workflow.preExecute hook', async () => { - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); - expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [ - workflow, - executionMode, - ]); + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [workflow, 'manual']); }); }); describe('workflowExecuteAfter', () => { it('should run workflow.postExecute hook', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(externalHooks.run).toHaveBeenCalledWith('workflow.postExecute', [ successfulRun, @@ -193,7 +192,7 @@ describe('Execution Lifecycle Hooks', () => { const statisticsTests = () => { describe('statistics events', () => { it('workflowExecuteAfter should emit workflowExecutionCompleted statistics event', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { workflowData, @@ -202,7 +201,7 @@ describe('Execution Lifecycle Hooks', () => { }); it('nodeFetchedData should handle nodeFetchedData statistics event', async () => { - await hooks.executeHookFunctions('nodeFetchedData', [workflowId, node]); + await lifecycleHooks.runHook('nodeFetchedData', [workflowId, node]); expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', { workflowId, @@ -212,12 +211,15 @@ describe('Execution Lifecycle Hooks', () => { }); }; - describe('getWorkflowHooksMain', () => { - const createHooks = () => - getWorkflowHooksMain({ executionMode, workflowData, pushRef, retryOf }, executionId); + describe('getLifecycleHooksForRegularMain', () => { + const createHooks = (executionMode: WorkflowExecuteMode = 'manual') => + getLifecycleHooksForRegularMain( + { executionMode, workflowData, pushRef, retryOf }, + executionId, + ); beforeEach(() => { - hooks = createHooks(); + lifecycleHooks = createHooks(); }); workflowEventTests(); @@ -226,23 +228,23 @@ describe('Execution Lifecycle Hooks', () => { statisticsTests(); it('should setup the correct set of hooks', () => { - expect(hooks).toBeInstanceOf(WorkflowHooks); - expect(hooks.mode).toBe('manual'); - expect(hooks.executionId).toBe(executionId); - expect(hooks.workflowData).toEqual(workflowData); + expect(lifecycleHooks).toBeInstanceOf(ExecutionLifecycleHooks); + expect(lifecycleHooks.mode).toBe('manual'); + expect(lifecycleHooks.executionId).toBe(executionId); + expect(lifecycleHooks.workflowData).toEqual(workflowData); - const { hookFunctions } = hooks; - expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(2); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(3); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(5); - expect(hookFunctions.nodeFetchedData).toHaveLength(1); - expect(hookFunctions.sendResponse).toHaveLength(0); + const { handlers } = lifecycleHooks; + expect(handlers.nodeExecuteBefore).toHaveLength(2); + expect(handlers.nodeExecuteAfter).toHaveLength(2); + expect(handlers.workflowExecuteBefore).toHaveLength(3); + expect(handlers.workflowExecuteAfter).toHaveLength(5); + expect(handlers.nodeFetchedData).toHaveLength(1); + expect(handlers.sendResponse).toHaveLength(0); }); describe('nodeExecuteBefore', () => { it('should send nodeExecuteBefore push event', async () => { - await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); + await lifecycleHooks.runHook('nodeExecuteBefore', [nodeName]); expect(push.send).toHaveBeenCalledWith( { type: 'nodeExecuteBefore', data: { executionId, nodeName } }, @@ -253,11 +255,7 @@ describe('Execution Lifecycle Hooks', () => { describe('nodeExecuteAfter', () => { it('should send nodeExecuteAfter push event', async () => { - await hooks.executeHookFunctions('nodeExecuteAfter', [ - nodeName, - taskData, - runExecutionData, - ]); + await lifecycleHooks.runHook('nodeExecuteAfter', [nodeName, taskData, runExecutionData]); expect(push.send).toHaveBeenCalledWith( { type: 'nodeExecuteAfter', data: { executionId, nodeName, data: taskData } }, @@ -267,15 +265,11 @@ describe('Execution Lifecycle Hooks', () => { it('should save execution progress when enabled', async () => { workflowData.settings = { saveExecutionProgress: true }; - hooks = createHooks(); + lifecycleHooks = createHooks(); - expect(hooks.hookFunctions.nodeExecuteAfter).toHaveLength(3); + expect(lifecycleHooks.handlers.nodeExecuteAfter).toHaveLength(3); - await hooks.executeHookFunctions('nodeExecuteAfter', [ - nodeName, - taskData, - runExecutionData, - ]); + await lifecycleHooks.runHook('nodeExecuteAfter', [nodeName, taskData, runExecutionData]); expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(executionId, { includeData: true, @@ -285,15 +279,11 @@ describe('Execution Lifecycle Hooks', () => { it('should not save execution progress when disabled', async () => { workflowData.settings = { saveExecutionProgress: false }; - hooks = createHooks(); + lifecycleHooks = createHooks(); - expect(hooks.hookFunctions.nodeExecuteAfter).toHaveLength(2); + expect(lifecycleHooks.handlers.nodeExecuteAfter).toHaveLength(2); - await hooks.executeHookFunctions('nodeExecuteAfter', [ - nodeName, - taskData, - runExecutionData, - ]); + await lifecycleHooks.runHook('nodeExecuteAfter', [nodeName, taskData, runExecutionData]); expect(executionRepository.findSingleExecution).not.toHaveBeenCalled(); }); @@ -301,14 +291,14 @@ describe('Execution Lifecycle Hooks', () => { describe('workflowExecuteBefore', () => { it('should send executionStarted push event', async () => { - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); expect(push.send).toHaveBeenCalledWith( { type: 'executionStarted', data: { executionId, - mode: executionMode, + mode: 'manual', retryOf, workflowId: 'test-workflow-id', workflowName: 'Test Workflow', @@ -321,18 +311,15 @@ describe('Execution Lifecycle Hooks', () => { }); it('should run workflow.preExecute external hook', async () => { - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); - expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [ - workflow, - executionMode, - ]); + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [workflow, 'manual']); }); }); describe('workflowExecuteAfter', () => { it('should send executionFinished push event', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(push.send).toHaveBeenCalledWith( { type: 'executionFinished', @@ -348,7 +335,7 @@ describe('Execution Lifecycle Hooks', () => { }); it('should send executionWaiting push event', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [waitingRun, {}]); expect(push.send).toHaveBeenCalledWith( { @@ -361,17 +348,15 @@ describe('Execution Lifecycle Hooks', () => { describe('saving static data', () => { it('should skip saving static data for manual executions', async () => { - hooks.mode = 'manual'; - - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, staticData]); expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled(); }); it('should save static data for prod executions', async () => { - hooks.mode = 'trigger'; + lifecycleHooks = createHooks('trigger'); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, staticData]); expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith( workflowId, @@ -380,11 +365,12 @@ describe('Execution Lifecycle Hooks', () => { }); it('should handle static data saving errors', async () => { - hooks.mode = 'trigger'; + lifecycleHooks = createHooks('trigger'); + const error = new Error('Static data save failed'); workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, staticData]); expect(errorReporter.error).toHaveBeenCalledWith(error); }); @@ -392,7 +378,7 @@ describe('Execution Lifecycle Hooks', () => { describe('saving execution data', () => { it('should update execution with proper data', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( executionId, @@ -406,31 +392,31 @@ describe('Execution Lifecycle Hooks', () => { it('should not delete unfinished executions', async () => { const unfinishedRun = mock({ finished: false, status: 'running' }); - await hooks.executeHookFunctions('workflowExecuteAfter', [unfinishedRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [unfinishedRun, {}]); expect(executionRepository.hardDelete).not.toHaveBeenCalled(); }); it('should not delete waiting executions', async () => { - await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [waitingRun, {}]); expect(executionRepository.hardDelete).not.toHaveBeenCalled(); }); it('should soft delete manual executions when manual saving is disabled', async () => { - hooks.workflowData.settings = { saveManualExecutions: false }; - hooks = createHooks(); + lifecycleHooks.workflowData.settings = { saveManualExecutions: false }; + lifecycleHooks = createHooks(); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(executionRepository.softDelete).toHaveBeenCalledWith(executionId); }); it('should not soft delete manual executions with waitTill', async () => { - hooks.workflowData.settings = { saveManualExecutions: false }; - hooks = createHooks(); + lifecycleHooks.workflowData.settings = { saveManualExecutions: false }; + lifecycleHooks = createHooks(); - await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [waitingRun, {}]); expect(executionRepository.softDelete).not.toHaveBeenCalled(); }); @@ -438,15 +424,14 @@ describe('Execution Lifecycle Hooks', () => { describe('error workflow', () => { it('should not execute error workflow for manual executions', async () => { - hooks.mode = 'manual'; - - await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); expect(workflowExecutionService.executeErrorWorkflow).not.toHaveBeenCalled(); }); it('should execute error workflow for failed non-manual executions', async () => { - hooks.mode = 'trigger'; + lifecycleHooks = createHooks('trigger'); + const errorWorkflow = 'error-workflow-id'; workflowData.settings = { errorWorkflow }; const project = mock(); @@ -454,7 +439,7 @@ describe('Execution Lifecycle Hooks', () => { .calledWith(workflowId) .mockResolvedValue(project); - await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); expect(workflowExecutionService.executeErrorWorkflow).toHaveBeenCalledWith( errorWorkflow, @@ -479,7 +464,8 @@ describe('Execution Lifecycle Hooks', () => { it('should restore binary data IDs after workflow execution for webhooks', async () => { config.set('binaryDataManager.mode', 'filesystem'); - hooks.mode = 'webhook'; + lifecycleHooks = createHooks('webhook'); + (successfulRun.data.resultData.runData = { [nodeName]: [ { @@ -505,7 +491,7 @@ describe('Execution Lifecycle Hooks', () => { }, ], }), - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(binaryDataService.rename).toHaveBeenCalledWith( 'workflows/test-workflow-id/executions/temp/binary_data/123', @@ -516,33 +502,32 @@ describe('Execution Lifecycle Hooks', () => { describe("when pushRef isn't set", () => { beforeEach(() => { - hooks = getWorkflowHooksMain({ executionMode, workflowData, retryOf }, executionId); + lifecycleHooks = getLifecycleHooksForRegularMain( + { executionMode: 'manual', workflowData, retryOf }, + executionId, + ); }); it('should not setup any push hooks', async () => { - const { hookFunctions } = hooks; - expect(hookFunctions.nodeExecuteBefore).toHaveLength(1); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(1); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); + const { handlers } = lifecycleHooks; + expect(handlers.nodeExecuteBefore).toHaveLength(1); + expect(handlers.nodeExecuteAfter).toHaveLength(1); + expect(handlers.workflowExecuteBefore).toHaveLength(2); + expect(handlers.workflowExecuteAfter).toHaveLength(4); - await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); - await hooks.executeHookFunctions('nodeExecuteAfter', [ - nodeName, - taskData, - runExecutionData, - ]); - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('nodeExecuteBefore', [nodeName]); + await lifecycleHooks.runHook('nodeExecuteAfter', [nodeName, taskData, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(push.send).not.toHaveBeenCalled(); }); }); }); - describe('getWorkflowHooksWorkerMain', () => { + describe('getLifecycleHooksForScalingMain', () => { beforeEach(() => { - hooks = getWorkflowHooksWorkerMain(executionMode, executionId, workflowData, { + lifecycleHooks = getLifecycleHooksForScalingMain('manual', executionId, workflowData, { pushRef, retryOf, }); @@ -552,28 +537,25 @@ describe('Execution Lifecycle Hooks', () => { externalHooksTests(); it('should setup the correct set of hooks', () => { - expect(hooks).toBeInstanceOf(WorkflowHooks); - expect(hooks.mode).toBe('manual'); - expect(hooks.executionId).toBe(executionId); - expect(hooks.workflowData).toEqual(workflowData); + expect(lifecycleHooks).toBeInstanceOf(ExecutionLifecycleHooks); + expect(lifecycleHooks.mode).toBe('manual'); + expect(lifecycleHooks.executionId).toBe(executionId); + expect(lifecycleHooks.workflowData).toEqual(workflowData); - const { hookFunctions } = hooks; - expect(hookFunctions.nodeExecuteBefore).toHaveLength(0); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(0); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); - expect(hookFunctions.nodeFetchedData).toHaveLength(0); - expect(hookFunctions.sendResponse).toHaveLength(0); + const { handlers } = lifecycleHooks; + expect(handlers.nodeExecuteBefore).toHaveLength(0); + expect(handlers.nodeExecuteAfter).toHaveLength(0); + expect(handlers.workflowExecuteBefore).toHaveLength(2); + expect(handlers.workflowExecuteAfter).toHaveLength(4); + expect(handlers.nodeFetchedData).toHaveLength(0); + expect(handlers.sendResponse).toHaveLength(0); }); describe('workflowExecuteBefore', () => { it('should run the workflow.preExecute external hook', async () => { - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); - expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [ - workflow, - executionMode, - ]); + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [workflow, 'manual']); }); }); @@ -583,12 +565,17 @@ describe('Execution Lifecycle Hooks', () => { saveDataSuccessExecution: 'none', saveDataErrorExecution: 'all', }; - const hooks = getWorkflowHooksWorkerMain('webhook', executionId, workflowData, { - pushRef, - retryOf, - }); + const lifecycleHooks = getLifecycleHooksForScalingMain( + 'webhook', + executionId, + workflowData, + { + pushRef, + retryOf, + }, + ); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); expect(executionRepository.hardDelete).toHaveBeenCalledWith({ workflowId, @@ -601,12 +588,17 @@ describe('Execution Lifecycle Hooks', () => { saveDataSuccessExecution: 'all', saveDataErrorExecution: 'none', }; - const hooks = getWorkflowHooksWorkerMain('webhook', executionId, workflowData, { - pushRef, - retryOf, - }); + const lifecycleHooks = getLifecycleHooksForScalingMain( + 'webhook', + executionId, + workflowData, + { + pushRef, + retryOf, + }, + ); - await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); expect(executionRepository.hardDelete).toHaveBeenCalledWith({ workflowId, @@ -616,12 +608,15 @@ describe('Execution Lifecycle Hooks', () => { }); }); - describe('getWorkflowHooksWorkerExecuter', () => { - beforeEach(() => { - hooks = getWorkflowHooksWorkerExecuter(executionMode, executionId, workflowData, { + describe('getLifecycleHooksForScalingWorker', () => { + const createHooks = (executionMode: WorkflowExecuteMode = 'manual') => + getLifecycleHooksForScalingWorker(executionMode, executionId, workflowData, { pushRef, retryOf, }); + + beforeEach(() => { + lifecycleHooks = createHooks(); }); nodeEventsTests(); @@ -629,33 +624,31 @@ describe('Execution Lifecycle Hooks', () => { statisticsTests(); it('should setup the correct set of hooks', () => { - expect(hooks).toBeInstanceOf(WorkflowHooks); - expect(hooks.mode).toBe('manual'); - expect(hooks.executionId).toBe(executionId); - expect(hooks.workflowData).toEqual(workflowData); + expect(lifecycleHooks).toBeInstanceOf(ExecutionLifecycleHooks); + expect(lifecycleHooks.mode).toBe('manual'); + expect(lifecycleHooks.executionId).toBe(executionId); + expect(lifecycleHooks.workflowData).toEqual(workflowData); - const { hookFunctions } = hooks; - expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(2); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); - expect(hookFunctions.nodeFetchedData).toHaveLength(1); - expect(hookFunctions.sendResponse).toHaveLength(0); + const { handlers } = lifecycleHooks; + expect(handlers.nodeExecuteBefore).toHaveLength(2); + expect(handlers.nodeExecuteAfter).toHaveLength(2); + expect(handlers.workflowExecuteBefore).toHaveLength(2); + expect(handlers.workflowExecuteAfter).toHaveLength(4); + expect(handlers.nodeFetchedData).toHaveLength(1); + expect(handlers.sendResponse).toHaveLength(0); }); describe('saving static data', () => { it('should skip saving static data for manual executions', async () => { - hooks.mode = 'manual'; - - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, staticData]); expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled(); }); it('should save static data for prod executions', async () => { - hooks.mode = 'trigger'; + lifecycleHooks = createHooks('trigger'); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, staticData]); expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith( workflowId, @@ -664,11 +657,11 @@ describe('Execution Lifecycle Hooks', () => { }); it('should handle static data saving errors', async () => { - hooks.mode = 'trigger'; + lifecycleHooks = createHooks('trigger'); const error = new Error('Static data save failed'); workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error); - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, staticData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, staticData]); expect(errorReporter.error).toHaveBeenCalledWith(error); }); @@ -676,21 +669,19 @@ describe('Execution Lifecycle Hooks', () => { describe('error workflow', () => { it('should not execute error workflow for manual executions', async () => { - hooks.mode = 'manual'; - - await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); expect(workflowExecutionService.executeErrorWorkflow).not.toHaveBeenCalled(); }); it('should execute error workflow for failed non-manual executions', async () => { - hooks.mode = 'trigger'; + lifecycleHooks = createHooks('trigger'); const errorWorkflow = 'error-workflow-id'; workflowData.settings = { errorWorkflow }; const project = mock(); ownershipService.getWorkflowProjectCached.calledWith(workflowId).mockResolvedValue(project); - await hooks.executeHookFunctions('workflowExecuteAfter', [failedRun, {}]); + await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); expect(workflowExecutionService.executeErrorWorkflow).toHaveBeenCalledWith( errorWorkflow, @@ -714,9 +705,14 @@ describe('Execution Lifecycle Hooks', () => { }); }); - describe('getWorkflowHooksIntegrated', () => { + describe('getLifecycleHooksForSubExecutions', () => { beforeEach(() => { - hooks = getWorkflowHooksIntegrated(executionMode, executionId, workflowData, undefined); + lifecycleHooks = getLifecycleHooksForSubExecutions( + 'manual', + executionId, + workflowData, + undefined, + ); }); workflowEventTests(); @@ -725,18 +721,18 @@ describe('Execution Lifecycle Hooks', () => { statisticsTests(); it('should setup the correct set of hooks', () => { - expect(hooks).toBeInstanceOf(WorkflowHooks); - expect(hooks.mode).toBe('manual'); - expect(hooks.executionId).toBe(executionId); - expect(hooks.workflowData).toEqual(workflowData); + expect(lifecycleHooks).toBeInstanceOf(ExecutionLifecycleHooks); + expect(lifecycleHooks.mode).toBe('manual'); + expect(lifecycleHooks.executionId).toBe(executionId); + expect(lifecycleHooks.workflowData).toEqual(workflowData); - const { hookFunctions } = hooks; - expect(hookFunctions.nodeExecuteBefore).toHaveLength(1); - expect(hookFunctions.nodeExecuteAfter).toHaveLength(1); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); - expect(hookFunctions.nodeFetchedData).toHaveLength(1); - expect(hookFunctions.sendResponse).toHaveLength(0); + const { handlers } = lifecycleHooks; + expect(handlers.nodeExecuteBefore).toHaveLength(1); + expect(handlers.nodeExecuteAfter).toHaveLength(1); + expect(handlers.workflowExecuteBefore).toHaveLength(2); + expect(handlers.workflowExecuteAfter).toHaveLength(4); + expect(handlers.nodeFetchedData).toHaveLength(1); + expect(handlers.sendResponse).toHaveLength(0); }); }); }); diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts index d355fe9db8..237bd25edb 100644 --- a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -1,18 +1,10 @@ import { Container } from '@n8n/di'; import { stringify } from 'flatted'; -import { ErrorReporter, Logger, InstanceSettings } from 'n8n-core'; -import { WorkflowHooks } from 'n8n-workflow'; +import { ErrorReporter, Logger, InstanceSettings, ExecutionLifecycleHooks } from 'n8n-core'; import type { - IDataObject, - INode, - IRun, - IRunExecutionData, - ITaskData, IWorkflowBase, - IWorkflowExecuteHooks, WorkflowExecuteMode, IWorkflowExecutionDataProcess, - Workflow, } from 'n8n-workflow'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; @@ -39,339 +31,255 @@ type HooksSetupParameters = { retryOf?: string; }; -function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks { - const result: IWorkflowExecuteHooks = { - nodeExecuteBefore: [], - nodeExecuteAfter: [], - workflowExecuteBefore: [], - workflowExecuteAfter: [], - sendResponse: [], - nodeFetchedData: [], - }; - for (const hooks of hookFunctions) { - for (const key in hooks) { - if (!result[key] || !hooks[key]) continue; - result[key].push(...hooks[key]); - } - } - return result; +function hookFunctionsWorkflowEvents(hooks: ExecutionLifecycleHooks, userId?: string) { + const eventService = Container.get(EventService); + hooks.addHandler('workflowExecuteBefore', function () { + const { executionId, workflowData } = this; + eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); + }); + hooks.addHandler('workflowExecuteAfter', function (runData) { + if (runData.status === 'waiting') return; + + const { executionId, workflowData: workflow } = this; + eventService.emit('workflow-post-execute', { executionId, runData, workflow, userId }); + }); } -function hookFunctionsWorkflowEvents(userId?: string): IWorkflowExecuteHooks { +function hookFunctionsNodeEvents(hooks: ExecutionLifecycleHooks) { const eventService = Container.get(EventService); - return { - workflowExecuteBefore: [ - async function (this: WorkflowHooks): Promise { - const { executionId, workflowData } = this; - eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); - }, - ], - workflowExecuteAfter: [ - async function (this: WorkflowHooks, runData: IRun): Promise { - if (runData.status === 'waiting') return; - - const { executionId, workflowData: workflow } = this; - eventService.emit('workflow-post-execute', { executionId, runData, workflow, userId }); - }, - ], - }; -} - -function hookFunctionsNodeEvents(): IWorkflowExecuteHooks { - const eventService = Container.get(EventService); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - }; + hooks.addHandler('nodeExecuteBefore', function (nodeName) { + const { executionId, workflowData: workflow } = this; + eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); + }); + hooks.addHandler('nodeExecuteAfter', function (nodeName) { + const { executionId, workflowData: workflow } = this; + eventService.emit('node-post-execute', { executionId, workflow, nodeName }); + }); } /** * Returns hook functions to push data to Editor-UI */ -function hookFunctionsPush({ pushRef, retryOf }: HooksSetupParameters): IWorkflowExecuteHooks { - if (!pushRef) return {}; +function hookFunctionsPush( + hooks: ExecutionLifecycleHooks, + { pushRef, retryOf }: HooksSetupParameters, +) { + if (!pushRef) return; const logger = Container.get(Logger); const pushInstance = Container.get(Push); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId } = this; - // Push data to session which started workflow before each - // node which starts rendering - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId, - pushRef, - workflowId: this.workflowData.id, - }); + hooks.addHandler('nodeExecuteBefore', function (nodeName) { + const { executionId } = this; + // Push data to session which started workflow before each + // node which starts rendering + logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { + executionId, + pushRef, + workflowId: this.workflowData.id, + }); - pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { - const { executionId } = this; - // Push data to session which started workflow after each rendered node - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId, - pushRef, - workflowId: this.workflowData.id, - }); + pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); + }); + hooks.addHandler('nodeExecuteAfter', function (nodeName, data) { + const { executionId } = this; + // Push data to session which started workflow after each rendered node + logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { + executionId, + pushRef, + workflowId: this.workflowData.id, + }); - pushInstance.send( - { type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, - pushRef, - ); - }, - ], - workflowExecuteBefore: [ - async function (this: WorkflowHooks, _workflow, data): Promise { - const { executionId } = this; - const { id: workflowId, name: workflowName } = this.workflowData; - logger.debug('Executing hook (hookFunctionsPush)', { + pushInstance.send({ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, pushRef); + }); + hooks.addHandler('workflowExecuteBefore', function (_workflow, data) { + const { executionId } = this; + const { id: workflowId, name: workflowName } = this.workflowData; + logger.debug('Executing hook (hookFunctionsPush)', { + executionId, + pushRef, + workflowId, + }); + // Push data to session which started the workflow + pushInstance.send( + { + type: 'executionStarted', + data: { executionId, - pushRef, + mode: this.mode, + startedAt: new Date(), + retryOf, workflowId, - }); - // Push data to session which started the workflow - pushInstance.send( - { - type: 'executionStarted', - data: { - executionId, - mode: this.mode, - startedAt: new Date(), - retryOf, - workflowId, - workflowName, - flattedRunData: data?.resultData.runData - ? stringify(data.resultData.runData) - : stringify({}), - }, - }, - pushRef, - ); + workflowName, + flattedRunData: data?.resultData.runData + ? stringify(data.resultData.runData) + : stringify({}), + }, }, - ], - workflowExecuteAfter: [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - const { executionId } = this; - const { id: workflowId } = this.workflowData; - logger.debug('Executing hook (hookFunctionsPush)', { - executionId, - pushRef, - workflowId, - }); + pushRef, + ); + }); + hooks.addHandler('workflowExecuteAfter', function (fullRunData) { + const { executionId } = this; + const { id: workflowId } = this.workflowData; + logger.debug('Executing hook (hookFunctionsPush)', { + executionId, + pushRef, + workflowId, + }); - const { status } = fullRunData; - if (status === 'waiting') { - pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef); - } else { - const rawData = stringify(fullRunData.data); - pushInstance.send( - { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, - pushRef, - ); - } - }, - ], - }; + const { status } = fullRunData; + if (status === 'waiting') { + pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef); + } else { + const rawData = stringify(fullRunData.data); + pushInstance.send( + { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, + pushRef, + ); + } + }); } -function hookFunctionsExternalHooks(): IWorkflowExecuteHooks { +function hookFunctionsExternalHooks(hooks: ExecutionLifecycleHooks) { const externalHooks = Container.get(ExternalHooks); - return { - workflowExecuteBefore: [ - async function (this: WorkflowHooks, workflow: Workflow): Promise { - await externalHooks.run('workflow.preExecute', [workflow, this.mode]); - }, - ], - workflowExecuteAfter: [ - async function (this: WorkflowHooks, fullRunData: IRun) { - await externalHooks.run('workflow.postExecute', [ - fullRunData, - this.workflowData, - this.executionId, - ]); - }, - ], - }; + hooks.addHandler('workflowExecuteBefore', async function (workflow) { + await externalHooks.run('workflow.preExecute', [workflow, this.mode]); + }); + hooks.addHandler('workflowExecuteAfter', async function (fullRunData) { + await externalHooks.run('workflow.postExecute', [ + fullRunData, + this.workflowData, + this.executionId, + ]); + }); } -function hookFunctionsSaveProgress({ saveSettings }: HooksSetupParameters): IWorkflowExecuteHooks { - if (!saveSettings.progress) return {}; - return { - nodeExecuteAfter: [ - async function ( - this: WorkflowHooks, - nodeName: string, - data: ITaskData, - executionData: IRunExecutionData, - ): Promise { - await saveExecutionProgress( - this.workflowData.id, - this.executionId, - nodeName, - data, - executionData, - ); - }, - ], - }; +function hookFunctionsSaveProgress( + hooks: ExecutionLifecycleHooks, + { saveSettings }: HooksSetupParameters, +) { + if (!saveSettings.progress) return; + hooks.addHandler('nodeExecuteAfter', async function (nodeName, data, executionData) { + await saveExecutionProgress( + this.workflowData.id, + this.executionId, + nodeName, + data, + executionData, + ); + }); } /** This should ideally be added before any other `workflowExecuteAfter` hook to ensure all hooks get the same execution status */ -function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks { - return { - workflowExecuteAfter: [ - async function (fullRunData: IRun) { - fullRunData.status = determineFinalExecutionStatus(fullRunData); - }, - ], - }; +function hookFunctionsFinalizeExecutionStatus(hooks: ExecutionLifecycleHooks) { + hooks.addHandler('workflowExecuteAfter', (fullRunData) => { + fullRunData.status = determineFinalExecutionStatus(fullRunData); + }); } -function hookFunctionsStatistics(): IWorkflowExecuteHooks { +function hookFunctionsStatistics(hooks: ExecutionLifecycleHooks) { const workflowStatisticsService = Container.get(WorkflowStatisticsService); - return { - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, - ], - }; + hooks.addHandler('nodeFetchedData', (workflowId, node) => { + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); + }); } /** * Returns hook functions to save workflow execution and call error workflow */ -function hookFunctionsSave({ - pushRef, - retryOf, - saveSettings, -}: HooksSetupParameters): IWorkflowExecuteHooks { +function hookFunctionsSave( + hooks: ExecutionLifecycleHooks, + { pushRef, retryOf, saveSettings }: HooksSetupParameters, +) { const logger = Container.get(Logger); const errorReporter = Container.get(ErrorReporter); const executionRepository = Container.get(ExecutionRepository); const workflowStaticDataService = Container.get(WorkflowStaticDataService); const workflowStatisticsService = Container.get(WorkflowStatisticsService); - return { - workflowExecuteAfter: [ - async function ( - this: WorkflowHooks, - fullRunData: IRun, - newStaticData: IDataObject, - ): Promise { - logger.debug('Executing hook (hookFunctionsSave)', { - executionId: this.executionId, + hooks.addHandler('workflowExecuteAfter', async function (fullRunData, newStaticData) { + logger.debug('Executing hook (hookFunctionsSave)', { + executionId: this.executionId, + workflowId: this.workflowData.id, + }); + + await restoreBinaryDataId(fullRunData, this.executionId, this.mode); + + const isManualMode = this.mode === 'manual'; + + try { + if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { + // Workflow is saved so update in database + try { + await workflowStaticDataService.saveStaticDataById(this.workflowData.id, newStaticData); + } catch (e) { + errorReporter.error(e); + logger.error( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, + { executionId: this.executionId, workflowId: this.workflowData.id }, + ); + } + } + + if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { + /** + * When manual executions are not being saved, we only soft-delete + * the execution so that the user can access its binary data + * while building their workflow. + * + * The manual execution and its binary data will be hard-deleted + * on the next pruning cycle after the grace period set by + * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. + */ + await executionRepository.softDelete(this.executionId); + + return; + } + + const shouldNotSave = + (fullRunData.status === 'success' && !saveSettings.success) || + (fullRunData.status !== 'success' && !saveSettings.error); + + if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, this.executionId, retryOf); + + await executionRepository.hardDelete({ workflowId: this.workflowData.id, + executionId: this.executionId, }); - await restoreBinaryDataId(fullRunData, this.executionId, this.mode); + return; + } - const isManualMode = this.mode === 'manual'; + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const fullExecutionData = prepareExecutionDataForDbUpdate({ + runData: fullRunData, + workflowData: this.workflowData, + workflowStatusFinal: fullRunData.status, + retryOf, + }); - try { - if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { - // Workflow is saved so update in database - try { - await workflowStaticDataService.saveStaticDataById( - this.workflowData.id, - newStaticData, - ); - } catch (e) { - errorReporter.error(e); - logger.error( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, - { executionId: this.executionId, workflowId: this.workflowData.id }, - ); - } - } + // When going into the waiting state, store the pushRef in the execution-data + if (fullRunData.waitTill && isManualMode) { + fullExecutionData.data.pushRef = pushRef; + } - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { - /** - * When manual executions are not being saved, we only soft-delete - * the execution so that the user can access its binary data - * while building their workflow. - * - * The manual execution and its binary data will be hard-deleted - * on the next pruning cycle after the grace period set by - * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. - */ - await executionRepository.softDelete(this.executionId); + await updateExistingExecution({ + executionId: this.executionId, + workflowId: this.workflowData.id, + executionData: fullExecutionData, + }); - return; - } - - const shouldNotSave = - (fullRunData.status === 'success' && !saveSettings.success) || - (fullRunData.status !== 'success' && !saveSettings.error); - - if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - retryOf, - ); - - await executionRepository.hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - - return; - } - - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const fullExecutionData = prepareExecutionDataForDbUpdate({ - runData: fullRunData, - workflowData: this.workflowData, - workflowStatusFinal: fullRunData.status, - retryOf, - }); - - // When going into the waiting state, store the pushRef in the execution-data - if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = pushRef; - } - - await updateExistingExecution({ - executionId: this.executionId, - workflowId: this.workflowData.id, - executionData: fullExecutionData, - }); - - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - retryOf, - ); - } - } finally { - workflowStatisticsService.emit('workflowExecutionCompleted', { - workflowData: this.workflowData, - fullRunData, - }); - } - }, - ], - }; + if (!isManualMode) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, this.executionId, retryOf); + } + } finally { + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, + fullRunData, + }); + } + }); } /** @@ -379,224 +287,196 @@ function hookFunctionsSave({ * for running with queues. Manual executions should never run on queues as * they are always executed in the main process. */ -function hookFunctionsSaveWorker({ - pushRef, - retryOf, -}: HooksSetupParameters): IWorkflowExecuteHooks { +function hookFunctionsSaveWorker( + hooks: ExecutionLifecycleHooks, + { pushRef, retryOf }: HooksSetupParameters, +) { const logger = Container.get(Logger); const errorReporter = Container.get(ErrorReporter); const workflowStaticDataService = Container.get(WorkflowStaticDataService); const workflowStatisticsService = Container.get(WorkflowStatisticsService); - return { - workflowExecuteAfter: [ - async function ( - this: WorkflowHooks, - fullRunData: IRun, - newStaticData: IDataObject, - ): Promise { - logger.debug('Executing hook (hookFunctionsSaveWorker)', { - executionId: this.executionId, - workflowId: this.workflowData.id, - }); + hooks.addHandler('workflowExecuteAfter', async function (fullRunData, newStaticData) { + logger.debug('Executing hook (hookFunctionsSaveWorker)', { + executionId: this.executionId, + workflowId: this.workflowData.id, + }); - const isManualMode = this.mode === 'manual'; + const isManualMode = this.mode === 'manual'; + try { + if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { + // Workflow is saved so update in database try { - if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { - // Workflow is saved so update in database - try { - await workflowStaticDataService.saveStaticDataById( - this.workflowData.id, - newStaticData, - ); - } catch (e) { - errorReporter.error(e); - logger.error( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, - { workflowId: this.workflowData.id }, - ); - } - } - - if ( - !isManualMode && - fullRunData.status !== 'success' && - fullRunData.status !== 'waiting' - ) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - retryOf, - ); - } - - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const fullExecutionData = prepareExecutionDataForDbUpdate({ - runData: fullRunData, - workflowData: this.workflowData, - workflowStatusFinal: fullRunData.status, - retryOf, - }); - - // When going into the waiting state, store the pushRef in the execution-data - if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = pushRef; - } - - await updateExistingExecution({ - executionId: this.executionId, - workflowId: this.workflowData.id, - executionData: fullExecutionData, - }); - } finally { - workflowStatisticsService.emit('workflowExecutionCompleted', { - workflowData: this.workflowData, - fullRunData, - }); + await workflowStaticDataService.saveStaticDataById(this.workflowData.id, newStaticData); + } catch (e) { + errorReporter.error(e); + logger.error( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, + { workflowId: this.workflowData.id }, + ); } - }, - ], - }; + } + + if (!isManualMode && fullRunData.status !== 'success' && fullRunData.status !== 'waiting') { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, this.executionId, retryOf); + } + + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const fullExecutionData = prepareExecutionDataForDbUpdate({ + runData: fullRunData, + workflowData: this.workflowData, + workflowStatusFinal: fullRunData.status, + retryOf, + }); + + // When going into the waiting state, store the pushRef in the execution-data + if (fullRunData.waitTill && isManualMode) { + fullExecutionData.data.pushRef = pushRef; + } + + await updateExistingExecution({ + executionId: this.executionId, + workflowId: this.workflowData.id, + executionData: fullExecutionData, + }); + } finally { + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, + fullRunData, + }); + } + }); } /** - * Returns WorkflowHooks instance for running integrated workflows + * Returns ExecutionLifecycleHooks instance for running integrated workflows * (Workflows which get started inside of another workflow) */ -export function getWorkflowHooksIntegrated( +export function getLifecycleHooksForSubExecutions( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, userId?: string, -): WorkflowHooks { +): ExecutionLifecycleHooks { + const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); const saveSettings = toSaveSettings(workflowData.settings); - const hookFunctions = mergeHookFunctions( - hookFunctionsWorkflowEvents(userId), - hookFunctionsNodeEvents(), - hookFunctionsFinalizeExecutionStatus(), - hookFunctionsSave({ saveSettings }), - hookFunctionsSaveProgress({ saveSettings }), - hookFunctionsStatistics(), - hookFunctionsExternalHooks(), - ); - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); + hookFunctionsWorkflowEvents(hooks, userId); + hookFunctionsNodeEvents(hooks); + hookFunctionsFinalizeExecutionStatus(hooks); + hookFunctionsSave(hooks, { saveSettings }); + hookFunctionsSaveProgress(hooks, { saveSettings }); + hookFunctionsStatistics(hooks); + hookFunctionsExternalHooks(hooks); + return hooks; } /** - * Returns WorkflowHooks instance for worker in scaling mode. + * Returns ExecutionLifecycleHooks instance for worker in scaling mode. */ -export function getWorkflowHooksWorkerExecuter( +export function getLifecycleHooksForScalingWorker( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, { pushRef, retryOf }: Omit = {}, -): WorkflowHooks { +): ExecutionLifecycleHooks { + const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); const saveSettings = toSaveSettings(workflowData.settings); const optionalParameters = { pushRef, retryOf, saveSettings }; - const toMerge = [ - hookFunctionsNodeEvents(), - hookFunctionsFinalizeExecutionStatus(), - hookFunctionsSaveWorker(optionalParameters), - hookFunctionsSaveProgress(optionalParameters), - hookFunctionsStatistics(), - hookFunctionsExternalHooks(), - ]; + hookFunctionsNodeEvents(hooks); + hookFunctionsFinalizeExecutionStatus(hooks); + hookFunctionsSaveWorker(hooks, optionalParameters); + hookFunctionsSaveProgress(hooks, optionalParameters); + hookFunctionsStatistics(hooks); + hookFunctionsExternalHooks(hooks); if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { - toMerge.push(hookFunctionsPush(optionalParameters)); + hookFunctionsPush(hooks, optionalParameters); } - const hookFunctions = mergeHookFunctions(...toMerge); - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); + return hooks; } /** - * Returns WorkflowHooks instance for main process if workflow runs via worker + * Returns ExecutionLifecycleHooks instance for main process if workflow runs via worker */ -export function getWorkflowHooksWorkerMain( +export function getLifecycleHooksForScalingMain( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, { pushRef, retryOf }: Omit = {}, -): WorkflowHooks { +): ExecutionLifecycleHooks { + const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); const saveSettings = toSaveSettings(workflowData.settings); const optionalParameters = { pushRef, retryOf, saveSettings }; const executionRepository = Container.get(ExecutionRepository); - const hookFunctions = mergeHookFunctions( - hookFunctionsWorkflowEvents(), - hookFunctionsSaveProgress(optionalParameters), - hookFunctionsExternalHooks(), - hookFunctionsFinalizeExecutionStatus(), - { - workflowExecuteAfter: [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - // Don't delete executions before they are finished - if (!fullRunData.finished) return; - const isManualMode = this.mode === 'manual'; + hookFunctionsWorkflowEvents(hooks); + hookFunctionsSaveProgress(hooks, optionalParameters); + hookFunctionsExternalHooks(hooks); + hookFunctionsFinalizeExecutionStatus(hooks); - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { - /** - * When manual executions are not being saved, we only soft-delete - * the execution so that the user can access its binary data - * while building their workflow. - * - * The manual execution and its binary data will be hard-deleted - * on the next pruning cycle after the grace period set by - * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. - */ - await executionRepository.softDelete(this.executionId); + hooks.addHandler('workflowExecuteAfter', async function (fullRunData) { + // Don't delete executions before they are finished + if (!fullRunData.finished) return; - return; - } + const isManualMode = this.mode === 'manual'; - const shouldNotSave = - (fullRunData.status === 'success' && !saveSettings.success) || - (fullRunData.status !== 'success' && !saveSettings.error); + if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { + /** + * When manual executions are not being saved, we only soft-delete + * the execution so that the user can access its binary data + * while building their workflow. + * + * The manual execution and its binary data will be hard-deleted + * on the next pruning cycle after the grace period set by + * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. + */ + await executionRepository.softDelete(this.executionId); - if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { - await executionRepository.hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - } - }, - ], - }, - ); + return; + } + + const shouldNotSave = + (fullRunData.status === 'success' && !saveSettings.success) || + (fullRunData.status !== 'success' && !saveSettings.error); + + if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { + await executionRepository.hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + } + }); // When running with worker mode, main process executes // Only workflowExecuteBefore + workflowExecuteAfter // So to avoid confusion, we are removing other hooks. - hookFunctions.nodeExecuteBefore = []; - hookFunctions.nodeExecuteAfter = []; + hooks.handlers.nodeExecuteBefore = []; + hooks.handlers.nodeExecuteAfter = []; - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); + return hooks; } /** - * Returns WorkflowHooks instance for running the main workflow + * Returns ExecutionLifecycleHooks instance for running the main workflow */ -export function getWorkflowHooksMain( +export function getLifecycleHooksForRegularMain( data: IWorkflowExecutionDataProcess, executionId: string, -): WorkflowHooks { - const { pushRef, retryOf } = data; - const saveSettings = toSaveSettings(data.workflowData.settings); +): ExecutionLifecycleHooks { + const { pushRef, retryOf, executionMode, workflowData } = data; + const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData); + const saveSettings = toSaveSettings(workflowData.settings); const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings }; - const hookFunctions = mergeHookFunctions( - hookFunctionsWorkflowEvents(), - hookFunctionsNodeEvents(), - hookFunctionsFinalizeExecutionStatus(), - hookFunctionsSave(optionalParameters), - hookFunctionsPush(optionalParameters), - hookFunctionsSaveProgress(optionalParameters), - hookFunctionsStatistics(), - hookFunctionsExternalHooks(), - ); - return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData); + hookFunctionsWorkflowEvents(hooks); + hookFunctionsNodeEvents(hooks); + hookFunctionsFinalizeExecutionStatus(hooks); + hookFunctionsSave(hooks, optionalParameters); + hookFunctionsPush(hooks, optionalParameters); + hookFunctionsSaveProgress(hooks, optionalParameters); + hookFunctionsStatistics(hooks); + hookFunctionsExternalHooks(hooks); + return hooks; } diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index e6ff4ad931..3a5d5a65b1 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -8,7 +8,7 @@ import { ARTIFICIAL_TASK_DATA } from '@/constants'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; -import { getWorkflowHooksMain } from '@/execution-lifecycle/execution-lifecycle-hooks'; +import { getLifecycleHooksForRegularMain } from '@/execution-lifecycle/execution-lifecycle-hooks'; import type { IExecutionResponse } from '@/interfaces'; import { Push } from '@/push'; @@ -174,7 +174,7 @@ export class ExecutionRecoveryService { private async runHooks(execution: IExecutionResponse) { execution.data ??= { resultData: { runData: {} } }; - const externalHooks = getWorkflowHooksMain( + const lifecycleHooks = getLifecycleHooksForRegularMain( { userId: '', workflowData: execution.workflowData, @@ -196,6 +196,6 @@ export class ExecutionRecoveryService { status: execution.status, }; - await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]); + await lifecycleHooks.runHook('workflowExecuteAfter', [run]); } } diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 363d7dab15..18a6f6b39f 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -19,7 +19,7 @@ import type PCancelable from 'p-cancelable'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; -import { getWorkflowHooksWorkerExecuter } from '@/execution-lifecycle/execution-lifecycle-hooks'; +import { getLifecycleHooksForScalingWorker } from '@/execution-lifecycle/execution-lifecycle-hooks'; import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; @@ -131,30 +131,29 @@ export class JobProcessor { const { pushRef } = job.data; - additionalData.hooks = getWorkflowHooksWorkerExecuter( + const lifecycleHooks = getLifecycleHooksForScalingWorker( execution.mode, job.data.executionId, execution.workflowData, { retryOf: execution.retryOf ?? undefined, pushRef }, ); + additionalData.hooks = lifecycleHooks; if (pushRef) { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({ pushRef }); } - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - const msg: RespondToWebhookMessage = { - kind: 'respond-to-webhook', - executionId, - response: this.encodeWebhookResponse(response), - workerId: this.instanceSettings.hostId, - }; + lifecycleHooks.addHandler('sendResponse', async (response): Promise => { + const msg: RespondToWebhookMessage = { + kind: 'respond-to-webhook', + executionId, + response: this.encodeWebhookResponse(response), + workerId: this.instanceSettings.hostId, + }; - await job.progress(msg); - }, - ]; + await job.progress(msg); + }); additionalData.executionId = executionId; @@ -206,7 +205,7 @@ export class JobProcessor { data: { resultData: { error, runData: {} } }, }; - await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [runData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); return { success: false }; } throw error; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 85d1201ed0..1c5dfcbf67 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -37,7 +37,7 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { EventService } from '@/events/event.service'; import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map'; -import { getWorkflowHooksIntegrated } from '@/execution-lifecycle/execution-lifecycle-hooks'; +import { getLifecycleHooksForSubExecutions } from '@/execution-lifecycle/execution-lifecycle-hooks'; import type { UpdateExecutionPayload } from '@/interfaces'; import { NodeTypes } from '@/node-types'; import { Push } from '@/push'; @@ -217,7 +217,7 @@ async function startExecution( // Create new additionalData to have different workflow loaded and to call // different webhooks const additionalDataIntegrated = await getBase(); - additionalDataIntegrated.hooks = getWorkflowHooksIntegrated( + additionalDataIntegrated.hooks = getLifecycleHooksForSubExecutions( runData.executionMode, executionId, workflowData, diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index e258284827..82beb20648 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -3,6 +3,7 @@ /* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { Container, Service } from '@n8n/di'; +import type { ExecutionLifecycleHooks } from 'n8n-core'; import { ErrorReporter, InstanceSettings, Logger, WorkflowExecute } from 'n8n-core'; import type { ExecutionError, @@ -11,7 +12,6 @@ import type { IPinData, IRun, WorkflowExecuteMode, - WorkflowHooks, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; import { ExecutionCancelledError, Workflow } from 'n8n-workflow'; @@ -22,9 +22,9 @@ import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { - getWorkflowHooksMain, - getWorkflowHooksWorkerExecuter, - getWorkflowHooksWorkerMain, + getLifecycleHooksForRegularMain, + getLifecycleHooksForScalingWorker, + getLifecycleHooksForScalingMain, } from '@/execution-lifecycle/execution-lifecycle-hooks'; import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; @@ -61,7 +61,7 @@ export class WorkflowRunner { startedAt: Date, executionMode: WorkflowExecuteMode, executionId: string, - hooks?: WorkflowHooks, + hooks?: ExecutionLifecycleHooks, ) { // This means the execution was probably cancelled and has already // been cleaned up. @@ -116,9 +116,7 @@ export class WorkflowRunner { // set the execution to failed. this.activeExecutions.finalizeExecution(executionId, fullRunData); - if (hooks) { - await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]); - } + await hooks?.runHook('workflowExecuteAfter', [fullRunData]); } /** Run the workflow @@ -140,12 +138,9 @@ export class WorkflowRunner { } catch (error) { // Create a failed execution with the data for the node, save it and abort execution const runData = generateFailedExecutionFromError(data.executionMode, error, error.node); - const workflowHooks = getWorkflowHooksMain(data, executionId); - await workflowHooks.executeHookFunctions('workflowExecuteBefore', [ - undefined, - data.executionData, - ]); - await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); + const lifecycleHooks = getLifecycleHooksForRegularMain(data, executionId); + await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); responsePromise?.reject(error); this.activeExecutions.finalizeExecution(executionId); return executionId; @@ -250,13 +245,12 @@ export class WorkflowRunner { await this.executionRepository.setRunning(executionId); // write try { - additionalData.hooks = getWorkflowHooksMain(data, executionId); + const lifecycleHooks = getLifecycleHooksForRegularMain(data, executionId); + additionalData.hooks = lifecycleHooks; - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - this.activeExecutions.resolveResponsePromise(executionId, response); - }, - ]; + lifecycleHooks.addHandler('sendResponse', (response) => { + this.activeExecutions.resolveResponsePromise(executionId, response); + }); additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ executionId, @@ -347,27 +341,32 @@ export class WorkflowRunner { // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds. // Check if they get retried by default and how often. let job: Job; - let hooks: WorkflowHooks; + let lifecycleHooks: ExecutionLifecycleHooks; try { job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); - hooks = getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { - retryOf: data.retryOf ?? undefined, - }); + lifecycleHooks = getLifecycleHooksForScalingMain( + data.executionMode, + executionId, + data.workflowData, + { + retryOf: data.retryOf ?? undefined, + }, + ); // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. - await hooks.executeHookFunctions('workflowExecuteBefore', [undefined, data.executionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]); } catch (error) { - // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the + // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. - const hooks = getWorkflowHooksWorkerExecuter( + const lifecycleHooks = getLifecycleHooksForScalingWorker( data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ?? undefined }, ); - await this.processError(error, new Date(), data.executionMode, executionId, hooks); + await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks); throw error; } @@ -377,9 +376,9 @@ export class WorkflowRunner { onCancel(async () => { await this.scalingService.stopJob(job); - // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the + // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. - const hooksWorker = getWorkflowHooksWorkerExecuter( + const lifecycleHooks = getLifecycleHooksForScalingWorker( data.executionMode, executionId, data.workflowData, @@ -387,7 +386,13 @@ export class WorkflowRunner { ); const error = new ExecutionCancelledError(executionId); - await this.processError(error, new Date(), data.executionMode, executionId, hooksWorker); + await this.processError( + error, + new Date(), + data.executionMode, + executionId, + lifecycleHooks, + ); reject(error); }); @@ -402,16 +407,22 @@ export class WorkflowRunner { error = new MaxStalledCountError(error); } - // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the + // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. - const hooks = getWorkflowHooksWorkerExecuter( + const lifecycleHooks = getLifecycleHooksForScalingWorker( data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ?? undefined }, ); - await this.processError(error, new Date(), data.executionMode, executionId, hooks); + await this.processError( + error, + new Date(), + data.executionMode, + executionId, + lifecycleHooks, + ); reject(error); } @@ -437,7 +448,7 @@ export class WorkflowRunner { // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed. - await hooks.executeHookFunctions('workflowExecuteAfter', [runData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); resolve(runData); }, diff --git a/packages/core/src/__tests__/node-execute-functions.test.ts b/packages/core/src/__tests__/node-execute-functions.test.ts index fff267c593..96306ac90b 100644 --- a/packages/core/src/__tests__/node-execute-functions.test.ts +++ b/packages/core/src/__tests__/node-execute-functions.test.ts @@ -6,10 +6,10 @@ import type { IRequestOptions, IWorkflowExecuteAdditionalData, Workflow, - WorkflowHooks, } from 'n8n-workflow'; import nock from 'nock'; +import type { ExecutionLifecycleHooks } from '@/execution-engine/execution-lifecycle-hooks'; import { copyInputItems, invokeAxios, @@ -21,12 +21,12 @@ describe('NodeExecuteFunctions', () => { describe('proxyRequestToAxios', () => { const baseUrl = 'http://example.de'; const workflow = mock(); - const hooks = mock(); + const hooks = mock(); const additionalData = mock({ hooks }); const node = mock(); beforeEach(() => { - hooks.executeHookFunctions.mockClear(); + hooks.runHook.mockClear(); }); test('should rethrow an error with `status` property', async () => { @@ -42,10 +42,7 @@ describe('NodeExecuteFunctions', () => { test('should not throw if the response status is 200', async () => { nock(baseUrl).get('/test').reply(200); await proxyRequestToAxios(workflow, additionalData, node, `${baseUrl}/test`); - expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [ - workflow.id, - node, - ]); + expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); test('should throw if the response status is 403', async () => { @@ -65,7 +62,7 @@ describe('NodeExecuteFunctions', () => { expect(error.config).toBeUndefined(); expect(error.message).toEqual('403 - "Forbidden"'); } - expect(hooks.executeHookFunctions).not.toHaveBeenCalled(); + expect(hooks.runHook).not.toHaveBeenCalled(); }); test('should not throw if the response status is 404, but `simple` option is set to `false`', async () => { @@ -76,10 +73,7 @@ describe('NodeExecuteFunctions', () => { }); expect(response).toEqual('Not Found'); - expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [ - workflow.id, - node, - ]); + expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); test('should return full response when `resolveWithFullResponse` is set to true', async () => { @@ -96,10 +90,7 @@ describe('NodeExecuteFunctions', () => { statusCode: 404, statusMessage: 'Not Found', }); - expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [ - workflow.id, - node, - ]); + expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); describe('redirects', () => { diff --git a/packages/core/src/execution-engine/__tests__/execution-lifecycle-hooks.test.ts b/packages/core/src/execution-engine/__tests__/execution-lifecycle-hooks.test.ts new file mode 100644 index 0000000000..fb85216b25 --- /dev/null +++ b/packages/core/src/execution-engine/__tests__/execution-lifecycle-hooks.test.ts @@ -0,0 +1,113 @@ +import { mock } from 'jest-mock-extended'; +import type { + IDataObject, + IExecuteResponsePromiseData, + INode, + IRun, + IRunExecutionData, + ITaskData, + IWorkflowBase, + Workflow, +} from 'n8n-workflow'; + +import type { + ExecutionLifecycleHookName, + ExecutionLifecyleHookHandlers, +} from '../execution-lifecycle-hooks'; +import { ExecutionLifecycleHooks } from '../execution-lifecycle-hooks'; + +describe('ExecutionLifecycleHooks', () => { + const executionId = '123'; + const workflowData = mock(); + + let hooks: ExecutionLifecycleHooks; + beforeEach(() => { + jest.clearAllMocks(); + hooks = new ExecutionLifecycleHooks('internal', executionId, workflowData); + }); + + describe('constructor()', () => { + it('should initialize with correct properties', () => { + expect(hooks.mode).toBe('internal'); + expect(hooks.executionId).toBe(executionId); + expect(hooks.workflowData).toBe(workflowData); + expect(hooks.handlers).toEqual({ + nodeExecuteAfter: [], + nodeExecuteBefore: [], + nodeFetchedData: [], + sendResponse: [], + workflowExecuteAfter: [], + workflowExecuteBefore: [], + }); + }); + }); + + describe('addHandler()', () => { + const hooksHandlers = + mock<{ + [K in keyof ExecutionLifecyleHookHandlers]: ExecutionLifecyleHookHandlers[K][number]; + }>(); + + const testCases: Array<{ + hook: ExecutionLifecycleHookName; + args: Parameters; + }> = [ + { hook: 'nodeExecuteBefore', args: ['testNode'] }, + { + hook: 'nodeExecuteAfter', + args: ['testNode', mock(), mock()], + }, + { hook: 'workflowExecuteBefore', args: [mock(), mock()] }, + { hook: 'workflowExecuteAfter', args: [mock(), mock()] }, + { hook: 'sendResponse', args: [mock()] }, + { hook: 'nodeFetchedData', args: ['workflow123', mock()] }, + ]; + + test.each(testCases)( + 'should add handlers to $hook hook and call them', + async ({ hook, args }) => { + hooks.addHandler(hook, hooksHandlers[hook]); + await hooks.runHook(hook, args); + expect(hooksHandlers[hook]).toHaveBeenCalledWith(...args); + }, + ); + }); + + describe('runHook()', () => { + it('should execute multiple hooks in order', async () => { + const executionOrder: string[] = []; + const hook1 = jest.fn().mockImplementation(async () => { + executionOrder.push('hook1'); + }); + const hook2 = jest.fn().mockImplementation(async () => { + executionOrder.push('hook2'); + }); + + hooks.addHandler('nodeExecuteBefore', hook1, hook2); + await hooks.runHook('nodeExecuteBefore', ['testNode']); + + expect(executionOrder).toEqual(['hook1', 'hook2']); + expect(hook1).toHaveBeenCalled(); + expect(hook2).toHaveBeenCalled(); + }); + + it('should maintain correct "this" context', async () => { + const hook = jest.fn().mockImplementation(async function (this: ExecutionLifecycleHooks) { + expect(this.executionId).toBe(executionId); + expect(this.mode).toBe('internal'); + }); + + hooks.addHandler('nodeExecuteBefore', hook); + await hooks.runHook('nodeExecuteBefore', ['testNode']); + + expect(hook).toHaveBeenCalled(); + }); + + it('should handle errors in hooks', async () => { + const errorHook = jest.fn().mockRejectedValue(new Error('Hook failed')); + hooks.addHandler('nodeExecuteBefore', errorHook); + + await expect(hooks.runHook('nodeExecuteBefore', ['testNode'])).rejects.toThrow('Hook failed'); + }); + }); +}); diff --git a/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts b/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts index 0b7b9cd4a4..25b1fddb26 100644 --- a/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts +++ b/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts @@ -9,10 +9,10 @@ import type { INodeType, INodeTypes, ITriggerFunctions, - WorkflowHooks, IRun, } from 'n8n-workflow'; +import { ExecutionLifecycleHooks } from '../execution-lifecycle-hooks'; import { TriggersAndPollers } from '../triggers-and-pollers'; describe('TriggersAndPollers', () => { @@ -23,15 +23,8 @@ describe('TriggersAndPollers', () => { }); const nodeTypes = mock(); const workflow = mock({ nodeTypes }); - const hookFunctions = mock({ - sendResponse: [], - workflowExecuteAfter: [], - }); - const additionalData = mock({ - hooks: { - hookFunctions, - }, - }); + const hooks = new ExecutionLifecycleHooks('internal', '123', mock()); + const additionalData = mock({ hooks }); const triggersAndPollers = new TriggersAndPollers(); beforeEach(() => { @@ -98,8 +91,7 @@ describe('TriggersAndPollers', () => { getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise); - expect(hookFunctions.sendResponse?.length).toBe(1); - await hookFunctions.sendResponse![0]?.({ testResponse: true }); + await hooks.runHook('sendResponse', [{ testResponse: true }]); expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); }); @@ -111,10 +103,10 @@ describe('TriggersAndPollers', () => { await runTriggerHelper('manual'); getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise, donePromise); - await hookFunctions.sendResponse![0]?.({ testResponse: true }); + await hooks.runHook('sendResponse', [{ testResponse: true }]); expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); - await hookFunctions.workflowExecuteAfter?.[0]?.(mockRunData, {}); + await hooks.runHook('workflowExecuteAfter', [mockRunData, {}]); expect(donePromise.resolve).toHaveBeenCalledWith(mockRunData); }); }); diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts index c5924e26f6..10af4a1e7b 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts @@ -44,6 +44,7 @@ import { import * as Helpers from '@test/helpers'; import { legacyWorkflowExecuteTests, v1WorkflowExecuteTests } from '@test/helpers/constants'; +import type { ExecutionLifecycleHooks } from '../execution-lifecycle-hooks'; import { DirectedGraph } from '../partial-execution-utils'; import * as partialExecutionUtils from '../partial-execution-utils'; import { createNodeData, toITaskData } from '../partial-execution-utils/__tests__/helpers'; @@ -1211,6 +1212,7 @@ describe('WorkflowExecute', () => { let runExecutionData: IRunExecutionData; let workflowExecute: WorkflowExecute; + let additionalData: IWorkflowExecuteAdditionalData; beforeEach(() => { runExecutionData = { @@ -1224,9 +1226,12 @@ describe('WorkflowExecute', () => { waitingExecutionSource: null, }, }; - workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + additionalData = mock(); + additionalData.hooks = mock(); - jest.spyOn(workflowExecute, 'executeHook').mockResolvedValue(undefined); + workflowExecute = new WorkflowExecute(additionalData, 'manual', runExecutionData); + + jest.spyOn(additionalData.hooks, 'runHook').mockResolvedValue(undefined); jest.spyOn(workflowExecute, 'moveNodeMetadata').mockImplementation(); }); @@ -1294,7 +1299,7 @@ describe('WorkflowExecute', () => { // Verify static data handling expect(result).toBeDefined(); expect(workflowExecute.moveNodeMetadata).toHaveBeenCalled(); - expect(workflowExecute.executeHook).toHaveBeenCalledWith('workflowExecuteAfter', [ + expect(additionalData.hooks?.runHook).toHaveBeenCalledWith('workflowExecuteAfter', [ result, workflow.staticData, ]); diff --git a/packages/core/src/execution-engine/execution-lifecycle-hooks.ts b/packages/core/src/execution-engine/execution-lifecycle-hooks.ts new file mode 100644 index 0000000000..047d5a2075 --- /dev/null +++ b/packages/core/src/execution-engine/execution-lifecycle-hooks.ts @@ -0,0 +1,119 @@ +import type { + IDataObject, + IExecuteResponsePromiseData, + INode, + IRun, + IRunExecutionData, + ITaskData, + IWorkflowBase, + Workflow, + WorkflowExecuteMode, +} from 'n8n-workflow'; + +export type ExecutionLifecyleHookHandlers = { + nodeExecuteBefore: Array< + (this: ExecutionLifecycleHooks, nodeName: string) => Promise | void + >; + + nodeExecuteAfter: Array< + ( + this: ExecutionLifecycleHooks, + nodeName: string, + data: ITaskData, + executionData: IRunExecutionData, + ) => Promise | void + >; + + workflowExecuteBefore: Array< + ( + this: ExecutionLifecycleHooks, + workflow: Workflow, + data?: IRunExecutionData, + ) => Promise | void + >; + + workflowExecuteAfter: Array< + (this: ExecutionLifecycleHooks, data: IRun, newStaticData: IDataObject) => Promise | void + >; + + /** Used by trigger and webhook nodes to respond back to the request */ + sendResponse: Array< + (this: ExecutionLifecycleHooks, response: IExecuteResponsePromiseData) => Promise | void + >; + + /** + * Executed after a node fetches data + * - For a webhook node, after the node had been run. + * - For a http-request node, or any other node that makes http requests that still use the deprecated request* methods, after every successful http request +s */ + nodeFetchedData: Array< + (this: ExecutionLifecycleHooks, workflowId: string, node: INode) => Promise | void + >; +}; + +export type ExecutionLifecycleHookName = keyof ExecutionLifecyleHookHandlers; + +/** + * Contains hooks that trigger at specific events in an execution's lifecycle. Every hook has an array of callbacks to run. + * + * Common use cases include: + * - Saving execution progress to database + * - Pushing execution status updates to the frontend + * - Recording workflow statistics + * - Running external hooks for execution events + * - Error and Cancellation handling and cleanup + * + * @example + * ```typescript + * const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); + * hooks.add('workflowExecuteAfter, async function(fullRunData) { + * await saveToDatabase(executionId, fullRunData); + *}); + * ``` + */ +export class ExecutionLifecycleHooks { + readonly handlers: ExecutionLifecyleHookHandlers = { + nodeExecuteAfter: [], + nodeExecuteBefore: [], + nodeFetchedData: [], + sendResponse: [], + workflowExecuteAfter: [], + workflowExecuteBefore: [], + }; + + constructor( + readonly mode: WorkflowExecuteMode, + readonly executionId: string, + readonly workflowData: IWorkflowBase, + ) {} + + addHandler( + hookName: Hook, + ...handlers: Array + ): void { + // @ts-expect-error FIX THIS + this.handlers[hookName].push(...handlers); + } + + async runHook< + Hook extends keyof ExecutionLifecyleHookHandlers, + Params extends unknown[] = Parameters< + Exclude[number] + >, + >(hookName: Hook, parameters: Params) { + const hooks = this.handlers[hookName]; + for (const hookFunction of hooks) { + const typedHookFunction = hookFunction as unknown as ( + this: ExecutionLifecycleHooks, + ...args: Params + ) => Promise; + await typedHookFunction.apply(this, parameters); + } + } +} + +declare module 'n8n-workflow' { + interface IWorkflowExecuteAdditionalData { + hooks?: ExecutionLifecycleHooks; + } +} diff --git a/packages/core/src/execution-engine/index.ts b/packages/core/src/execution-engine/index.ts index bf99655e65..bb8adfb34c 100644 --- a/packages/core/src/execution-engine/index.ts +++ b/packages/core/src/execution-engine/index.ts @@ -5,3 +5,4 @@ export * from './node-execution-context'; export * from './partial-execution-utils'; export * from './node-execution-context/utils/execution-metadata'; export * from './workflow-execute'; +export { ExecutionLifecycleHooks } from './execution-lifecycle-hooks'; diff --git a/packages/core/src/execution-engine/node-execution-context/execute-context.ts b/packages/core/src/execution-engine/node-execution-context/execute-context.ts index 61dada86df..bf2e89f8ec 100644 --- a/packages/core/src/execution-engine/node-execution-context/execute-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/execute-context.ts @@ -194,7 +194,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti } async sendResponse(response: IExecuteResponsePromiseData): Promise { - await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]); + await this.additionalData.hooks?.runHook('sendResponse', [response]); } /** @deprecated use ISupplyDataFunctions.addInputData */ diff --git a/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts b/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts index 55328d6690..33e2723a20 100644 --- a/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts @@ -258,12 +258,12 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData } runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData; - await additionalData.hooks?.executeHookFunctions('nodeExecuteBefore', [nodeName]); + await additionalData.hooks?.runHook('nodeExecuteBefore', [nodeName]); } else { // Outputs taskData.executionTime = new Date().getTime() - taskData.startTime; - await additionalData.hooks?.executeHookFunctions('nodeExecuteAfter', [ + await additionalData.hooks?.runHook('nodeExecuteAfter', [ nodeName, taskData, this.runExecutionData, diff --git a/packages/core/src/execution-engine/triggers-and-pollers.ts b/packages/core/src/execution-engine/triggers-and-pollers.ts index 308893e9f6..9c90327149 100644 --- a/packages/core/src/execution-engine/triggers-and-pollers.ts +++ b/packages/core/src/execution-engine/triggers-and-pollers.ts @@ -13,6 +13,7 @@ import type { IExecuteResponsePromiseData, IRun, } from 'n8n-workflow'; +import assert from 'node:assert'; import type { IGetExecuteTriggerFunctions } from './interfaces'; @@ -47,46 +48,34 @@ export class TriggersAndPollers { // Add the manual trigger response which resolves when the first time data got emitted triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { + const { hooks } = additionalData; + assert.ok(hooks, 'Execution lifecycle hooks are not defined'); + triggerFunctions.emit = ( - (resolveEmit) => - ( - data: INodeExecutionData[][], - responsePromise?: IDeferredPromise, - donePromise?: IDeferredPromise, - ) => { - additionalData.hooks!.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - if (responsePromise) { - responsePromise.resolve(response); - } - }, - ]; - - if (donePromise) { - additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift( - async (runData: IRun): Promise => { - return donePromise.resolve(runData); - }, - ); - } - - resolveEmit(data); + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, + ) => { + if (responsePromise) { + hooks.addHandler('sendResponse', (response) => responsePromise.resolve(response)); } - )(resolve); + + if (donePromise) { + hooks.addHandler('workflowExecuteAfter', (runData) => donePromise.resolve(runData)); + } + + resolve(data); + }; + triggerFunctions.emitError = ( - (rejectEmit) => - (error: Error, responsePromise?: IDeferredPromise) => { - additionalData.hooks!.hookFunctions.sendResponse = [ - async (): Promise => { - if (responsePromise) { - responsePromise.reject(error); - } - }, - ]; - - rejectEmit(error); + error: Error, + responsePromise?: IDeferredPromise, + ) => { + if (responsePromise) { + hooks.addHandler('sendResponse', () => responsePromise.reject(error)); } - )(reject); + reject(error); + }; }); return triggerResponse; diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index 2e0e89f26a..00a7a580a3 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -405,19 +405,6 @@ export class WorkflowExecute { return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); } - /** - * Executes the hook with the given name - * - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - async executeHook(hookName: string, parameters: any[]): Promise { - if (this.additionalData.hooks === undefined) { - return; - } - - return await this.additionalData.hooks.executeHookFunctions(hookName, parameters); - } - /** * Merges temporary execution metadata into the final runData structure. * During workflow execution, metadata is collected in a temporary location @@ -1207,11 +1194,14 @@ export class WorkflowExecute { this.status = 'running'; + const { hooks, executionId } = this.additionalData; + assert.ok(hooks, 'Failed to run workflow due to missing execution lifecycle hooks'); + if (!this.runExecutionData.executionData) { throw new ApplicationError('Failed to run workflow due to missing execution data', { extra: { workflowId: workflow.id, - executionid: this.additionalData.executionId, + executionId, mode: this.mode, }, }); @@ -1269,14 +1259,14 @@ export class WorkflowExecute { this.status = 'canceled'; this.abortController.abort(); const fullRunData = this.getFullRunData(startedAt); - void this.executeHook('workflowExecuteAfter', [fullRunData]); + void hooks.runHook('workflowExecuteAfter', [fullRunData]); }); // eslint-disable-next-line complexity const returnPromise = (async () => { try { if (!this.additionalData.restartExecutionId) { - await this.executeHook('workflowExecuteBefore', [workflow, this.runExecutionData]); + await hooks.runHook('workflowExecuteBefore', [workflow, this.runExecutionData]); } } catch (error) { const e = error as unknown as ExecutionBaseError; @@ -1360,7 +1350,7 @@ export class WorkflowExecute { node: executionNode.name, workflowId: workflow.id, }); - await this.executeHook('nodeExecuteBefore', [executionNode.name]); + await hooks.runHook('nodeExecuteBefore', [executionNode.name]); // Get the index of the current run runIndex = 0; @@ -1651,7 +1641,7 @@ export class WorkflowExecute { this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); // Only execute the nodeExecuteAfter hook if the node did not get aborted if (!this.isCancelled) { - await this.executeHook('nodeExecuteAfter', [ + await hooks.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1693,7 +1683,7 @@ export class WorkflowExecute { this.runExecutionData.resultData.runData[executionNode.name].push(taskData); if (this.runExecutionData.waitTill) { - await this.executeHook('nodeExecuteAfter', [ + await hooks.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1712,7 +1702,7 @@ export class WorkflowExecute { ) { // Before stopping, make sure we are executing hooks so // That frontend is notified for example for manual executions. - await this.executeHook('nodeExecuteAfter', [ + await hooks.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1822,7 +1812,7 @@ export class WorkflowExecute { // Execute hooks now to make sure that all hooks are executed properly // Await is needed to make sure that we don't fall into concurrency problems // When saving node execution data - await this.executeHook('nodeExecuteAfter', [ + await hooks.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -2025,7 +2015,7 @@ export class WorkflowExecute { this.moveNodeMetadata(); - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch( + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch( // eslint-disable-next-line @typescript-eslint/no-shadow (error) => { console.error('There was a problem running hook "workflowExecuteAfter"', error); @@ -2118,7 +2108,10 @@ export class WorkflowExecute { this.moveNodeMetadata(); // Prevent from running the hook if the error is an abort error as it was already handled if (!this.isCancelled) { - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await this.additionalData.hooks?.runHook('workflowExecuteAfter', [ + fullRunData, + newStaticData, + ]); } if (closeFunction) { diff --git a/packages/core/src/node-execute-functions.ts b/packages/core/src/node-execute-functions.ts index cb4e7d2b52..34a4757962 100644 --- a/packages/core/src/node-execute-functions.ts +++ b/packages/core/src/node-execute-functions.ts @@ -263,7 +263,7 @@ export async function proxyRequestToAxios( } else if (body === '') { body = axiosConfig.responseType === 'arraybuffer' ? Buffer.alloc(0) : undefined; } - await additionalData?.hooks?.executeHookFunctions('nodeFetchedData', [workflow?.id, node]); + await additionalData?.hooks?.runHook('nodeFetchedData', [workflow?.id, node]); return configObject.resolveWithFullResponse ? { body, diff --git a/packages/core/test/helpers/index.ts b/packages/core/test/helpers/index.ts index 28cec2dff2..27008b177a 100644 --- a/packages/core/test/helpers/index.ts +++ b/packages/core/test/helpers/index.ts @@ -6,7 +6,6 @@ import type { INodeType, INodeTypes, IRun, - ITaskData, IVersionedNodeType, IWorkflowBase, IWorkflowExecuteAdditionalData, @@ -14,10 +13,11 @@ import type { WorkflowTestData, INodeTypeData, } from 'n8n-workflow'; -import { ApplicationError, NodeHelpers, WorkflowHooks } from 'n8n-workflow'; +import { ApplicationError, NodeHelpers } from 'n8n-workflow'; import path from 'path'; import { UnrecognizedNodeTypeError } from '@/errors'; +import { ExecutionLifecycleHooks } from '@/execution-engine/execution-lifecycle-hooks'; import { predefinedNodesTypes } from './constants'; @@ -53,22 +53,12 @@ export function WorkflowExecuteAdditionalData( waitPromise: IDeferredPromise, nodeExecutionOrder: string[], ): IWorkflowExecuteAdditionalData { - const hookFunctions = { - nodeExecuteAfter: [ - async (nodeName: string, _data: ITaskData): Promise => { - nodeExecutionOrder.push(nodeName); - }, - ], - workflowExecuteAfter: [ - async (fullRunData: IRun): Promise => { - waitPromise.resolve(fullRunData); - }, - ], - }; - - return mock({ - hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()), + const hooks = new ExecutionLifecycleHooks('trigger', '1', mock()); + hooks.addHandler('nodeExecuteAfter', (nodeName) => { + nodeExecutionOrder.push(nodeName); }); + hooks.addHandler('workflowExecuteAfter', (fullRunData) => waitPromise.resolve(fullRunData)); + return mock({ hooks }); } const preparePinData = (pinData: IDataObject) => { diff --git a/packages/nodes-base/test/nodes/Helpers.ts b/packages/nodes-base/test/nodes/Helpers.ts index 81e68b2d05..13ffe88de5 100644 --- a/packages/nodes-base/test/nodes/Helpers.ts +++ b/packages/nodes-base/test/nodes/Helpers.ts @@ -8,6 +8,7 @@ import { Credentials, UnrecognizedNodeTypeError, constructExecutionMetaData, + ExecutionLifecycleHooks, } from 'n8n-core'; import type { CredentialLoadingDetails, @@ -28,14 +29,13 @@ import type { INodeTypeData, INodeTypes, IRun, - ITaskData, IVersionedNodeType, IWorkflowBase, IWorkflowExecuteAdditionalData, NodeLoadingDetails, WorkflowTestData, } from 'n8n-workflow'; -import { ApplicationError, ICredentialsHelper, NodeHelpers, WorkflowHooks } from 'n8n-workflow'; +import { ApplicationError, ICredentialsHelper, NodeHelpers } from 'n8n-workflow'; import { tmpdir } from 'os'; import path from 'path'; @@ -155,22 +155,15 @@ export function WorkflowExecuteAdditionalData( waitPromise: IDeferredPromise, nodeExecutionOrder: string[], ): IWorkflowExecuteAdditionalData { - const hookFunctions = { - nodeExecuteAfter: [ - async (nodeName: string, _data: ITaskData): Promise => { - nodeExecutionOrder.push(nodeName); - }, - ], - workflowExecuteAfter: [ - async (fullRunData: IRun): Promise => { - waitPromise.resolve(fullRunData); - }, - ], - }; + const hooks = new ExecutionLifecycleHooks('trigger', '1', mock()); + hooks.addHandler('nodeExecuteAfter', (nodeName) => { + nodeExecutionOrder.push(nodeName); + }); + hooks.addHandler('workflowExecuteAfter', (fullRunData) => waitPromise.resolve(fullRunData)); return mock({ credentialsHelper: new CredentialsHelper(), - hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()), + hooks, // Get from node.parameters currentNodeParameters: undefined, }); diff --git a/packages/nodes-base/test/nodes/TriggerHelpers.ts b/packages/nodes-base/test/nodes/TriggerHelpers.ts index 5348cab08c..ce31937992 100644 --- a/packages/nodes-base/test/nodes/TriggerHelpers.ts +++ b/packages/nodes-base/test/nodes/TriggerHelpers.ts @@ -3,7 +3,8 @@ import { mock } from 'jest-mock-extended'; import get from 'lodash/get'; import merge from 'lodash/merge'; import set from 'lodash/set'; -import { PollContext, returnJsonArray, type InstanceSettings } from 'n8n-core'; +import { PollContext, returnJsonArray } from 'n8n-core'; +import type { InstanceSettings, ExecutionLifecycleHooks } from 'n8n-core'; import { ScheduledTaskManager } from 'n8n-core/dist/execution-engine/scheduled-task-manager'; import type { IBinaryData, @@ -19,7 +20,6 @@ import type { NodeTypeAndVersion, VersionedNodeType, Workflow, - WorkflowHooks, } from 'n8n-workflow'; type MockDeepPartial = Parameters>[0]; @@ -212,7 +212,7 @@ export async function testPollingTriggerNode( return options as IHttpRequestOptions; }, }), - hooks: mock(), + hooks: mock(), }), mode, 'init', diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 3a7f530227..dbba497357 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -24,7 +24,6 @@ import type { ExecutionStatus } from './ExecutionStatus'; import type { Result } from './result'; import type { Workflow } from './Workflow'; import type { EnvProviderState } from './WorkflowDataProxyEnvProvider'; -import type { WorkflowHooks } from './WorkflowHooks'; export interface IAdditionalCredentialOptions { oauth2?: IOAuth2Options; @@ -2237,17 +2236,6 @@ export interface IWorkflowCredentials { }; } -export interface IWorkflowExecuteHooks { - [key: string]: Array<(...args: any[]) => Promise> | undefined; - nodeExecuteAfter?: Array< - (nodeName: string, data: ITaskData, executionData: IRunExecutionData) => Promise - >; - nodeExecuteBefore?: Array<(nodeName: string) => Promise>; - workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise>; - workflowExecuteBefore?: Array<(workflow?: Workflow, data?: IRunExecutionData) => Promise>; - sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; -} - export interface IWorkflowExecutionDataProcess { destinationNode?: string; restartExecutionId?: string; @@ -2325,7 +2313,6 @@ export interface IWorkflowExecuteAdditionalData { ) => Promise; executionId?: string; restartExecutionId?: string; - hooks?: WorkflowHooks; httpResponse?: express.Response; httpRequest?: express.Request; restApiUrl: string; diff --git a/packages/workflow/src/WorkflowHooks.ts b/packages/workflow/src/WorkflowHooks.ts deleted file mode 100644 index d7e05b9a1a..0000000000 --- a/packages/workflow/src/WorkflowHooks.ts +++ /dev/null @@ -1,33 +0,0 @@ -import type { IWorkflowBase, IWorkflowExecuteHooks, WorkflowExecuteMode } from './Interfaces'; - -export class WorkflowHooks { - mode: WorkflowExecuteMode; - - workflowData: IWorkflowBase; - - executionId: string; - - hookFunctions: IWorkflowExecuteHooks; - - constructor( - hookFunctions: IWorkflowExecuteHooks, - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, - ) { - this.hookFunctions = hookFunctions; - this.mode = mode; - this.executionId = executionId; - this.workflowData = workflowData; - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - async executeHookFunctions(hookName: string, parameters: any[]) { - const hooks = this.hookFunctions[hookName]; - if (hooks !== undefined && Array.isArray(hooks)) { - for (const hookFunction of hooks) { - await hookFunction.apply(this, parameters); - } - } - } -} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 4b32c2f601..111bc16a32 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -18,7 +18,6 @@ export * from './NodeHelpers'; export * from './Workflow'; export * from './WorkflowDataProxy'; export * from './WorkflowDataProxyEnvProvider'; -export * from './WorkflowHooks'; export * from './VersionedNodeType'; export * from './TypeValidation'; export * from './result';