From 80180230b14a71f47a7c8cad46254aa2e48c0c3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 23 Dec 2024 14:39:41 +0100 Subject: [PATCH] refactor(core): Move WorkflowHooks to core, improve type-safety, and add tests --- .../workflow-execute-additional-data.test.ts | 14 +- .../cli/src/__tests__/workflow-runner.test.ts | 64 +- .../__tests__/execution-hooks-factory.test.ts | 785 +++++++++++++++++ .../execution-hooks-factory.ts | 464 ++++++++++ .../save-execution-progress.ts | 1 + .../shared/shared-hook-functions.ts | 92 +- .../execution-recovery.service.test.ts | 4 + .../executions/execution-recovery.service.ts | 7 +- .../__tests__/job-processor.service.test.ts | 1 + packages/cli/src/scaling/job-processor.ts | 26 +- .../__tests__/object-to-error.test.ts | 2 +- packages/cli/src/utils/object-to-error.ts | 53 ++ .../src/workflow-execute-additional-data.ts | 808 +----------------- packages/cli/src/workflow-runner.ts | 236 ++++- packages/core/src/Interfaces.ts | 8 + packages/core/src/NodeExecuteFunctions.ts | 2 +- packages/core/src/TriggersAndPollers.ts | 28 +- packages/core/src/WorkflowExecute.ts | 49 +- .../src/__tests__/execution-hooks.test.ts | 114 +++ packages/core/src/execution-hooks.ts | 119 +++ packages/core/src/index.ts | 1 + .../__tests__/shared-tests.ts | 4 +- .../base-execute-context.ts | 2 +- .../node-execution-context/execute-context.ts | 2 +- .../supply-data-context.ts | 4 +- .../core/test/NodeExecuteFunctions.test.ts | 23 +- packages/core/test/TriggersAndPollers.test.ts | 20 +- packages/core/test/helpers/index.ts | 26 +- packages/nodes-base/test/nodes/Helpers.ts | 25 +- .../nodes-base/test/nodes/TriggerHelpers.ts | 6 +- packages/workflow/src/Interfaces.ts | 21 +- packages/workflow/src/WorkflowHooks.ts | 49 -- packages/workflow/src/index.ts | 1 - 33 files changed, 1890 insertions(+), 1171 deletions(-) create mode 100644 packages/cli/src/execution-lifecycle-hooks/__tests__/execution-hooks-factory.test.ts create mode 100644 packages/cli/src/execution-lifecycle-hooks/execution-hooks-factory.ts rename packages/cli/src/{ => utils}/__tests__/object-to-error.test.ts (94%) create mode 100644 packages/cli/src/utils/object-to-error.ts create mode 100644 packages/core/src/__tests__/execution-hooks.test.ts create mode 100644 packages/core/src/execution-hooks.ts delete mode 100644 packages/workflow/src/WorkflowHooks.ts diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index 641e239393..3501a2a169 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -17,13 +17,15 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { VariablesService } from '@/environments.ee/variables/variables.service.ee'; import { EventService } from '@/events/event.service'; +import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory'; import { ExternalHooks } from '@/external-hooks'; +import { Push } from '@/push'; import { SecretsHelper } from '@/secrets-helpers.ee'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; import { Telemetry } from '@/telemetry'; import { PermissionChecker } from '@/user-management/permission-checker'; -import { executeWorkflow, getBase, getRunData } from '@/workflow-execute-additional-data'; +import { executeSubWorkflow, getBase, getRunData } from '@/workflow-execute-additional-data'; import { mockInstance } from '@test/mocking'; const EXECUTION_ID = '123'; @@ -78,6 +80,8 @@ jest.mock('n8n-core', () => ({ })); describe('WorkflowExecuteAdditionalData', () => { + mockInstance(Push); + mockInstance(ExecutionHooksFactory); const variablesService = mockInstance(VariablesService); variablesService.getAllCached.mockResolvedValue([]); const credentialsHelper = mockInstance(CredentialsHelper); @@ -128,7 +132,7 @@ describe('WorkflowExecuteAdditionalData', () => { }); it('should execute workflow, return data and execution id', async () => { - const response = await executeWorkflow( + const response = await executeSubWorkflow( mock(), mock(), mock({ loadedWorkflowData: undefined, doNotWaitToFinish: false }), @@ -141,7 +145,7 @@ describe('WorkflowExecuteAdditionalData', () => { }); it('should execute workflow, skip waiting', async () => { - const response = await executeWorkflow( + const response = await executeSubWorkflow( mock(), mock(), mock({ loadedWorkflowData: undefined, doNotWaitToFinish: true }), @@ -154,7 +158,7 @@ describe('WorkflowExecuteAdditionalData', () => { }); it('should set sub workflow execution as running', async () => { - await executeWorkflow( + await executeSubWorkflow( mock(), mock(), mock({ loadedWorkflowData: undefined }), @@ -167,7 +171,7 @@ describe('WorkflowExecuteAdditionalData', () => { const waitTill = new Date(); runWithData.waitTill = waitTill; - const response = await executeWorkflow( + const response = await executeSubWorkflow( mock(), mock(), mock({ loadedWorkflowData: undefined, doNotWaitToFinish: false }), diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 683343b44c..2ed2faed96 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -11,18 +11,15 @@ import type { IWorkflowExecutionDataProcess, StartNodeData, } from 'n8n-workflow'; -import { - Workflow, - WorkflowHooks, - type ExecutionError, - type IWorkflowExecuteHooks, -} from 'n8n-workflow'; +import { Workflow, type ExecutionError } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import Container from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; +import type { ExecutionEntity } from '@/databases/entities/execution-entity'; import type { User } from '@/databases/entities/user'; +import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { Telemetry } from '@/telemetry'; import { PermissionChecker } from '@/user-management/permission-checker'; @@ -36,25 +33,14 @@ import { setupTestServer } from '@test-integration/utils'; let owner: User; let runner: WorkflowRunner; -let hookFunctions: IWorkflowExecuteHooks; setupTestServer({ endpointGroups: [] }); mockInstance(Telemetry); -class Watchers { - workflowExecuteAfter = jest.fn(); -} -const watchers = new Watchers(); -const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter'); - beforeAll(async () => { owner = await createUser({ role: 'global:owner' }); runner = Container.get(WorkflowRunner); - - hookFunctions = { - workflowExecuteAfter: [watchers.workflowExecuteAfter], - }; }); afterAll(() => { @@ -67,48 +53,44 @@ beforeEach(async () => { }); describe('processError', () => { + let workflow: WorkflowEntity; + let execution: ExecutionEntity; + let hooks: core.ExecutionHooks; + + const watcher = mock<{ workflowExecuteAfter: () => Promise }>(); + + beforeEach(async () => { + jest.clearAllMocks(); + workflow = await createWorkflow({}, owner); + execution = await createExecution({ status: 'success', finished: true }, workflow); + hooks = new core.ExecutionHooks('webhook', execution.id, workflow); + hooks.addHook('workflowExecuteAfter', watcher.workflowExecuteAfter); + }); + test('processError should return early in Bull stalled edge case', async () => { - const workflow = await createWorkflow({}, owner); - const execution = await createExecution( - { - status: 'success', - finished: true, - }, - workflow, - ); config.set('executions.mode', 'queue'); await runner.processError( new Error('test') as ExecutionError, new Date(), 'webhook', execution.id, - new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + hooks, ); - expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0); + expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0); }); test('processError should return early if the error is `ExecutionNotFoundError`', async () => { - const workflow = await createWorkflow({}, owner); - const execution = await createExecution({ status: 'success', finished: true }, workflow); await runner.processError( new ExecutionNotFoundError(execution.id), new Date(), 'webhook', execution.id, - new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + hooks, ); - expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0); + expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0); }); test('processError should process error', async () => { - const workflow = await createWorkflow({}, owner); - const execution = await createExecution( - { - status: 'success', - finished: true, - }, - workflow, - ); await Container.get(ActiveExecutions).add( { executionMode: 'webhook', workflowData: workflow }, execution.id, @@ -119,9 +101,9 @@ describe('processError', () => { new Date(), 'webhook', execution.id, - new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + hooks, ); - expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1); + expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-hooks-factory.test.ts b/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-hooks-factory.test.ts new file mode 100644 index 0000000000..c38549e39c --- /dev/null +++ b/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-hooks-factory.test.ts @@ -0,0 +1,785 @@ +import { mock } from 'jest-mock-extended'; +import type { ErrorReporter, ExecutionHooksOptionalParameters } from 'n8n-core'; +import type { + IWorkflowBase, + IWorkflowExecutionDataProcess, + ITaskData, + IRunExecutionData, + IRun, + IDataObject, + NodeOperationError, + WorkflowExecuteMode, +} from 'n8n-workflow'; + +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { EventService } from '@/events/event.service'; +import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory'; +import type { ExternalHooks } from '@/external-hooks'; +import type { IExecutionResponse } from '@/interfaces'; +import type { Push } from '@/push'; +import type { ExecutionMetadataService } from '@/services/execution-metadata.service'; +import type { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; +import type { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; +import { mockInstance } from '@test/mocking'; + +describe('ExecutionHooksFactory', () => { + const errorReporter = mock(); + const executionRepository = mockInstance(ExecutionRepository); + const externalHooks = mock(); + const workflowStatisticsService = mock(); + const workflowStaticDataService = mock(); + const executionMetadataService = mock(); + const eventService = mock(); + const push = mock(); + + const hooksFactory = new ExecutionHooksFactory( + mock(), + errorReporter, + executionRepository, + externalHooks, + workflowStatisticsService, + workflowStaticDataService, + executionMetadataService, + eventService, + push, + ); + + const workflowId = 'workflow_id'; + const executionId = '123'; + const pushRef = 'abcd'; + const workflowData = mock({ + id: workflowId, + settings: {}, + nodes: [], + }); + const optionalParameters: ExecutionHooksOptionalParameters = { + retryOf: 'retry123', + pushRef, + }; + + beforeEach(() => { + jest.clearAllMocks(); + workflowData.settings = {}; + }); + + describe('forExecutionOnMain', () => { + const executionData = mock({ + executionMode: 'manual', + workflowData, + pushRef, + retryOf: 'retry123', + }); + const newStaticData: IDataObject = { newKey: 'newValue' }; + const fullRunData: IRun = { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + }; + + it('should add all required hooks', () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + const registeredHooks = (hooks as any).registered; + expect(registeredHooks.nodeExecuteBefore).toHaveLength(1); + expect(registeredHooks.nodeExecuteAfter).toHaveLength(2); + expect(registeredHooks.workflowExecuteBefore).toHaveLength(2); + expect(registeredHooks.workflowExecuteAfter).toHaveLength(2); + expect(registeredHooks.nodeFetchedData).toHaveLength(0); + expect(registeredHooks.sendResponse).toHaveLength(0); + }); + + it('should create hooks and execute them correctly', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + // Test workflowExecuteBefore hook + await hooks.executeHook('workflowExecuteBefore', []); + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']); + + // Test nodeExecuteBefore hook + await hooks.executeHook('nodeExecuteBefore', ['testNode']); + expect(push.send).toHaveBeenCalledWith( + { + type: 'nodeExecuteBefore', + data: { executionId, nodeName: 'testNode' }, + }, + pushRef, + ); + push.send.mockClear(); + + // Test nodeExecuteAfter hook + const taskData = mock({}); + const runExecutionData: IRunExecutionData = { resultData: { runData: {} } }; + + await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); + + expect(push.send).toHaveBeenCalledWith( + { + type: 'nodeExecuteAfter', + data: { executionId, nodeName: 'testNode', data: taskData }, + }, + pushRef, + ); + push.send.mockClear(); + + // Test workflowExecuteAfter hook + await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + expect(push.send).toHaveBeenCalledWith( + { + type: 'executionFinished', + data: { + executionId, + workflowId, + status: 'success', + rawData: '[{"resultData":"1"},{"runData":"2"},{}]', + }, + }, + pushRef, + ); + expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { + workflowData, + fullRunData, + }); + }); + + it('should handle waiting status in workflowExecuteAfter', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + await hooks.executeHook('workflowExecuteAfter', [ + { + ...fullRunData, + status: 'waiting', + waitTill: new Date('2099-12-31'), + }, + newStaticData, + ]); + + expect(push.send).toHaveBeenCalledWith( + { type: 'executionWaiting', data: { executionId } }, + pushRef, + ); + }); + + describe('static data', () => { + it('should not update for manual executions', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled(); + }); + + it('should update for non-manual executions', async () => { + const hooks = hooksFactory.forExecutionOnMain( + { + ...executionData, + executionMode: 'webhook', + }, + executionId, + ); + + await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith( + workflowData.id, + newStaticData, + ); + }); + }); + + describe('execution saving', () => { + it('should cleanup manual executions, if saving is disabled', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + // Mock workflow settings to not save manual executions + workflowData.settings = { saveManualExecutions: false }; + + await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + expect(executionRepository.softDelete).toHaveBeenCalledWith(executionId); + }); + + it('should handle execution progress saving', async () => { + workflowData.settings = { saveExecutionProgress: true }; + + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + const taskData = mock({}); + const runExecutionData: IRunExecutionData = { + resultData: { + runData: { + testNode: [], + }, + }, + }; + const fullExecutionData = mock({ + finished: false, + data: runExecutionData, + }); + executionRepository.findSingleExecution + .calledWith(executionId) + .mockResolvedValue(fullExecutionData); + + await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); + + expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode'); + expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( + executionId, + fullExecutionData, + ); + }); + }); + + describe('error handling', () => { + it('should handle error workflows when execution fails', async () => { + // Set workflow settings to save error executions + workflowData.settings = { saveDataErrorExecution: 'all' }; + + const hooks = hooksFactory.forExecutionOnMain( + { + ...executionData, + executionMode: 'webhook', + }, + executionId, + ); + + const executeErrorWorkflowSpy = jest.spyOn( + WorkflowExecuteAdditionalData, + 'executeErrorWorkflow', + ); + + const failedRunData: IRun = { + ...fullRunData, + status: 'error', + data: { + resultData: { + runData: {}, + error: mock(), + }, + }, + }; + + await hooks.executeHook('workflowExecuteAfter', [failedRunData, newStaticData]); + + // Verify error workflow execution + expect(executeErrorWorkflowSpy).toHaveBeenCalledWith( + workflowData, + failedRunData, + 'webhook', + executionId, + 'retry123', + ); + + executeErrorWorkflowSpy.mockRestore(); + }); + + it('should handle static data save errors', async () => { + const hooks = hooksFactory.forExecutionOnMain( + { + ...executionData, + executionMode: 'webhook', + }, + executionId, + ); + + const error = new Error('Static data save failed'); + workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error); + + await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + + it('should handle execution save errors', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + const error = new Error('DB save failed'); + executionRepository.updateExistingExecution.mockRejectedValueOnce(error); + + await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + }); + + describe('metadata handling', () => { + it('should save execution metadata when present', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + const runDataWithMetadata: IRun = { + ...fullRunData, + data: { + resultData: { + runData: {}, + metadata: { + someMetadata: 'value', + }, + }, + }, + }; + + await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]); + + expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, { + someMetadata: 'value', + }); + }); + + it('should handle metadata save errors gracefully', async () => { + const hooks = hooksFactory.forExecutionOnMain(executionData, executionId); + + const error = new Error('Metadata save failed'); + executionMetadataService.save.mockRejectedValueOnce(error); + + const runDataWithMetadata: IRun = { + ...fullRunData, + data: { + resultData: { + runData: {}, + metadata: { + someMetadata: 'value', + }, + }, + }, + }; + + await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + }); + }); + + describe('forExecutionOnWorker', () => { + it('should add all required hooks', () => { + const hooks = hooksFactory.forExecutionOnWorker( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const registeredHooks = (hooks as any).registered; + + expect(registeredHooks.nodeExecuteBefore).toHaveLength(0); + expect(registeredHooks.nodeExecuteAfter).toHaveLength(1); + expect(registeredHooks.workflowExecuteBefore).toHaveLength(1); + expect(registeredHooks.workflowExecuteAfter).toHaveLength(1); + expect(registeredHooks.nodeFetchedData).toHaveLength(0); + expect(registeredHooks.sendResponse).toHaveLength(0); + }); + + it('should run external hook `workflow.preExecute` on workflowExecuteBefore', async () => { + const hooks = hooksFactory.forExecutionOnWorker( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + await hooks.executeHook('workflowExecuteBefore', []); + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']); + }); + + describe('workflowExecuteAfter', () => { + it('should not delete unfinished executions', async () => { + const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData); + + const unfinishedRunData: IRun = { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'running', + finished: false, + }; + + await hooks.executeHook('workflowExecuteAfter', [unfinishedRunData]); + + expect(executionRepository.hardDelete).not.toHaveBeenCalled(); + }); + + it('should handle successful executions based on save settings', async () => { + // Test when success executions should not be saved + workflowData.settings = { saveDataSuccessExecution: 'none' }; + + const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData); + + const successRunData: IRun = { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [successRunData]); + + expect(executionRepository.hardDelete).toHaveBeenCalledWith({ + workflowId, + executionId, + }); + }); + + it('should handle error executions based on save settings', async () => { + // Test when error executions should not be saved + workflowData.settings = { saveDataErrorExecution: 'none' }; + + const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData); + + const errorRunData: IRun = { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'error', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [errorRunData]); + + expect(executionRepository.hardDelete).toHaveBeenCalledWith({ + workflowId, + executionId, + }); + }); + + it('should not delete executions when save settings allow it', async () => { + // Test when success executions should be saved + workflowData.settings = { saveDataSuccessExecution: 'all' }; + + const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData); + + const successRunData: IRun = { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [successRunData]); + + expect(executionRepository.hardDelete).not.toHaveBeenCalled(); + }); + + test.each(['manual', 'trigger', 'webhook', 'schedule'] as WorkflowExecuteMode[])( + 'should handle execution mode %s', + async (mode) => { + const hooks = hooksFactory.forExecutionOnWorker(mode, executionId, workflowData); + + const runData: IRun = { + data: { resultData: { runData: {} } }, + mode, + startedAt: new Date(), + status: 'success', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [runData]); + + expect(runData.status).toBe('success'); + }, + ); + }); + }); + + describe('forSubExecution', () => { + it('should add all required hooks', () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const registeredHooks = (hooks as any).registered; + + expect(registeredHooks.nodeExecuteBefore).toHaveLength(1); + expect(registeredHooks.nodeExecuteAfter).toHaveLength(2); + expect(registeredHooks.workflowExecuteBefore).toHaveLength(2); + expect(registeredHooks.workflowExecuteAfter).toHaveLength(1); + expect(registeredHooks.nodeFetchedData).toHaveLength(1); + expect(registeredHooks.sendResponse).toHaveLength(0); + }); + + describe('preExecute hooks', () => { + it('should handle workflow pre-execution', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + await hooks.executeHook('workflowExecuteBefore', []); + + expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [ + undefined, + 'manual', + ]); + }); + + it('should handle node execution progress saving', async () => { + workflowData.settings = { saveExecutionProgress: true }; + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const taskData = mock({}); + const runExecutionData: IRunExecutionData = { + resultData: { + runData: { + testNode: [], + }, + }, + }; + const fullExecutionData = mock({ + finished: false, + data: runExecutionData, + }); + + executionRepository.findSingleExecution + .calledWith(executionId) + .mockResolvedValue(fullExecutionData); + + await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); + + expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode'); + expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( + executionId, + fullExecutionData, + ); + }); + }); + + describe('event hooks', () => { + it('should emit node execution events', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + await hooks.executeHook('nodeExecuteBefore', ['testNode']); + expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', { + executionId, + workflow: workflowData, + nodeName: 'testNode', + }); + + await hooks.executeHook('nodeExecuteAfter', ['testNode']); + expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', { + executionId, + workflow: workflowData, + nodeName: 'testNode', + }); + }); + + it('should emit workflow execution events', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + await hooks.executeHook('workflowExecuteBefore', []); + expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', { + executionId, + data: workflowData, + }); + }); + + it('should emit node data fetch events', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + const testNode = { name: 'Test Node' }; + + await hooks.executeHook('nodeFetchedData', [workflowId, testNode]); + expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', { + workflowId, + node: testNode, + }); + }); + }); + + describe('saving hooks', () => { + it('should handle successful execution saves', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const successRunData: IRun = { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [successRunData, {}]); + + expect(executionRepository.updateExistingExecution).toHaveBeenCalled(); + expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { + workflowData, + fullRunData: successRunData, + }); + }); + + it('should handle error execution saves', async () => { + workflowData.settings = { saveDataErrorExecution: 'all' }; + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const errorRunData: IRun = { + data: { + resultData: { + runData: {}, + error: mock(), + }, + }, + mode: 'manual', + startedAt: new Date(), + status: 'error', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [errorRunData, {}]); + + expect(executionRepository.updateExistingExecution).toHaveBeenCalled(); + expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { + workflowData, + fullRunData: errorRunData, + }); + }); + + it('should handle metadata updates', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const runDataWithMetadata: IRun = { + data: { + resultData: { + runData: {}, + metadata: { + parameter: 'test', + }, + }, + }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]); + + expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, { + parameter: 'test', + }); + }); + + it('should handle static data updates', async () => { + const hooks = hooksFactory.forSubExecution( + 'webhook', + executionId, + workflowData, + optionalParameters, + ); + const newStaticData = { newKey: 'newValue' }; + + await hooks.executeHook('workflowExecuteAfter', [ + { + data: { resultData: { runData: {} } }, + mode: 'webhook', + startedAt: new Date(), + status: 'success', + finished: true, + }, + newStaticData, + ]); + + expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith( + workflowId, + newStaticData, + ); + }); + }); + + describe('error handling', () => { + it('should handle execution save errors', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const error = new Error('Save failed'); + executionRepository.updateExistingExecution.mockRejectedValueOnce(error); + + await hooks.executeHook('workflowExecuteAfter', [ + { + data: { resultData: { runData: {} } }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + finished: true, + }, + {}, + ]); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + + it('should handle metadata save errors', async () => { + const hooks = hooksFactory.forSubExecution( + 'manual', + executionId, + workflowData, + optionalParameters, + ); + + const error = new Error('Metadata save failed'); + executionMetadataService.save.mockRejectedValueOnce(error); + + const runDataWithMetadata: IRun = { + data: { + resultData: { + runData: {}, + metadata: {}, + }, + }, + mode: 'manual', + startedAt: new Date(), + status: 'success', + finished: true, + }; + + await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + }); + }); +}); diff --git a/packages/cli/src/execution-lifecycle-hooks/execution-hooks-factory.ts b/packages/cli/src/execution-lifecycle-hooks/execution-hooks-factory.ts new file mode 100644 index 0000000000..03d19960a8 --- /dev/null +++ b/packages/cli/src/execution-lifecycle-hooks/execution-hooks-factory.ts @@ -0,0 +1,464 @@ +import { stringify } from 'flatted'; +import { pick } from 'lodash'; +import type { ExecutionHooksOptionalParameters } from 'n8n-core'; +import { ErrorReporter, ExecutionHooks, Logger } from 'n8n-core'; +import type { + IRun, + ExecutionStatus, + WorkflowExecuteMode, + IWorkflowBase, + IWorkflowExecutionDataProcess, +} from 'n8n-workflow'; +import { ensureError } from 'n8n-workflow'; +import { Service } from 'typedi'; + +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { EventService } from '@/events/event.service'; +import { ExternalHooks } from '@/external-hooks'; +import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces'; +import { Push } from '@/push'; +import { ExecutionMetadataService } from '@/services/execution-metadata.service'; +import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import { isWorkflowIdValid } from '@/utils'; +import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; +import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; + +import { restoreBinaryDataId } from './restore-binary-data-id'; +import { saveExecutionProgress } from './save-execution-progress'; +import { determineFinalExecutionStatus } from './shared/shared-hook-functions'; +import { toSaveSettings } from './to-save-settings'; + +@Service() +export class ExecutionHooksFactory { + constructor( + private readonly logger: Logger, + private readonly errorReporter: ErrorReporter, + private readonly executionRepository: ExecutionRepository, + private readonly externalHooks: ExternalHooks, + private readonly workflowStatisticsService: WorkflowStatisticsService, + private readonly workflowStaticDataService: WorkflowStaticDataService, + private readonly executionMetadataService: ExecutionMetadataService, + private readonly eventService: EventService, + private readonly push: Push, + ) {} + + /** Returns ExecutionHooks instance for running the main workflow */ + forExecutionOnMain( + { executionMode, workflowData, pushRef, retryOf }: IWorkflowExecutionDataProcess, + executionId: string, + ) { + const hooks = new ExecutionHooks(executionMode, executionId, workflowData, { + pushRef, + retryOf, + }); + this.addPreExecuteHooks(hooks); + this.addSavingHooks(hooks, false); + this.addPushHooks(hooks); + return hooks; + } + + /** Returns ExecutionHooks instance for main process if workflow runs via worker */ + forExecutionOnWorker( + mode: WorkflowExecuteMode, + executionId: string, + workflowData: IWorkflowBase, + optionalParameters: ExecutionHooksOptionalParameters = {}, + ) { + const hooks = new ExecutionHooks(mode, executionId, workflowData, optionalParameters); + this.addPreExecuteHooks(hooks); + + // When running with worker mode, main process executes + // Only workflowExecuteBefore + workflowExecuteAfter + // So to avoid confusion, we are removing other hooks. + + const { executionRepository } = this; + // TODO: >>> clear all nodeExecuteAfter hooks <<< + // hookFunctions.nodeExecuteAfter = []; + hooks.addHook('workflowExecuteAfter', async function (fullRunData) { + // Don't delete executions before they are finished + if (!fullRunData.finished) return; + + const executionStatus = determineFinalExecutionStatus(fullRunData); + fullRunData.status = executionStatus; + + const saveSettings = toSaveSettings(this.workflowData.settings); + + const shouldNotSave = + (executionStatus === 'success' && !saveSettings.success) || + (executionStatus !== 'success' && !saveSettings.error); + + if (shouldNotSave) { + await executionRepository.hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + } + }); + + return hooks; + } + + /** Returns ExecutionHooks instance for running sub-workflows */ + forSubExecution( + mode: WorkflowExecuteMode, + executionId: string, + workflowData: IWorkflowBase, + optionalParameters: ExecutionHooksOptionalParameters, + ) { + const hooks = new ExecutionHooks(mode, executionId, workflowData, optionalParameters); + this.addPreExecuteHooks(hooks); + this.addEventHooks(hooks); + this.addSavingHooks(hooks, false); + return hooks; + } + + private addPreExecuteHooks(hooks: ExecutionHooks) { + const { externalHooks } = this; + + hooks.addHook('workflowExecuteBefore', async function (workflow) { + await externalHooks.run('workflow.preExecute', [workflow, this.mode]); + }); + + // TODO: skip this if saveSettings.progress is not true + hooks.addHook('nodeExecuteAfter', async function (nodeName, data, executionData) { + await saveExecutionProgress( + this.workflowData, + this.executionId, + nodeName, + data, + executionData, + this.pushRef, + ); + }); + } + + private addEventHooks(hooks: ExecutionHooks) { + const { eventService, workflowStatisticsService } = this; + hooks.addHook('nodeExecuteBefore', async function (nodeName) { + const { executionId, workflowData: workflow } = this; + eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); + }); + + hooks.addHook('nodeExecuteAfter', async function (nodeName) { + const { executionId, workflowData: workflow } = this; + eventService.emit('node-post-execute', { executionId, workflow, nodeName }); + }); + + hooks.addHook('workflowExecuteBefore', async function () { + const { executionId, workflowData } = this; + eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); + }); + + hooks.addHook('nodeFetchedData', async (workflowId, node) => { + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); + }); + } + + /** Returns hook functions to save workflow execution and call error workflow */ + private addSavingHooks(hooks: ExecutionHooks, isWorker: boolean) { + const { + errorReporter, + executionRepository, + logger, + workflowStatisticsService, + workflowStaticDataService, + } = this; + // eslint-disable-next-line @typescript-eslint/no-this-alias + const factory = this; + + // eslint-disable-next-line complexity + hooks.addHook('workflowExecuteAfter', async function (fullRunData, newStaticData) { + logger.debug('Executing hook (hookFunctionsSave)', { + executionId: this.executionId, + workflowId: this.workflowData.id, + isWorker, + }); + + // TODO: why is this skipped in the worker? + if (!isWorker) { + await restoreBinaryDataId(fullRunData, this.executionId, this.mode); + } + + const isManualMode = this.mode === 'manual'; + + try { + if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { + // Workflow is saved so update in database + try { + await workflowStaticDataService.saveStaticDataById(this.workflowData.id, newStaticData); + } catch (e) { + errorReporter.error(e); + logger.error( + `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${ensureError(e).message}" (hookFunctionsSave)`, + { executionId: this.executionId, workflowId: this.workflowData.id }, + ); + } + } + + const executionStatus = determineFinalExecutionStatus(fullRunData); + fullRunData.status = executionStatus; + + const saveSettings = toSaveSettings(this.workflowData.settings); + + if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { + /** + * When manual executions are not being saved, we only soft-delete + * the execution so that the user can access its binary data + * while building their workflow. + * + * The manual execution and its binary data will be hard-deleted + * on the next pruning cycle after the grace period set by + * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. + */ + await executionRepository.softDelete(this.executionId); + + return; + } + + const shouldNotSave = + (executionStatus === 'success' && !saveSettings.success) || + (executionStatus !== 'success' && !saveSettings.error); + + if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { + WorkflowExecuteAdditionalData.executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + + await executionRepository.hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + + return; + } + + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const fullExecutionData = factory.prepareExecutionDataForDbUpdate({ + runData: fullRunData, + workflowData: this.workflowData, + workflowStatusFinal: executionStatus, + retryOf: this.retryOf, + }); + + // When going into the waiting state, store the pushRef in the execution-data + if (fullRunData.waitTill && isManualMode) { + fullExecutionData.data.pushRef = this.pushRef; + } + + await factory.updateExistingExecution({ + executionId: this.executionId, + workflowId: this.workflowData.id, + executionData: fullExecutionData, + }); + + if (!isManualMode) { + WorkflowExecuteAdditionalData.executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + } + } catch (error) { + errorReporter.error(error); + logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, { + executionId: this.executionId, + workflowId: this.workflowData.id, + error: ensureError(error), + }); + if (!isManualMode) { + WorkflowExecuteAdditionalData.executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + } + } finally { + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, + fullRunData, + }); + } + }); + } + + /** Returns hook functions to push data to Editor-UI */ + private addPushHooks(hooks: ExecutionHooks) { + const { logger, push } = this; + + hooks.addHook('nodeExecuteBefore', async function (nodeName) { + const { pushRef, executionId } = this; + // Push data to session which started workflow before each + // node which starts rendering + if (pushRef === undefined) { + return; + } + + logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { + executionId, + pushRef, + workflowId: this.workflowData.id, + }); + + push.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); + }); + + hooks.addHook('nodeExecuteAfter', async function (nodeName, data) { + const { pushRef, executionId } = this; + // Push data to session which started workflow after each rendered node + if (pushRef === undefined) { + return; + } + + logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { + executionId, + pushRef, + workflowId: this.workflowData.id, + }); + + push.send({ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, pushRef); + }); + + hooks.addHook('workflowExecuteBefore', async function (_workflow, data) { + const { pushRef, executionId } = this; + const { id: workflowId, name: workflowName } = this.workflowData; + logger.debug('Executing hook (hookFunctionsPush)', { + executionId, + pushRef, + workflowId, + }); + // Push data to session which started the workflow + if (pushRef === undefined) { + return; + } + push.send( + { + type: 'executionStarted', + data: { + executionId, + mode: this.mode, + startedAt: new Date(), + retryOf: this.retryOf, + workflowId, + workflowName, + flattedRunData: data?.resultData.runData + ? stringify(data.resultData.runData) + : stringify({}), + }, + }, + pushRef, + ); + }); + + hooks.addHook('workflowExecuteAfter', async function (fullRunData) { + const { pushRef, executionId } = this; + if (pushRef === undefined) return; + + const { id: workflowId } = this.workflowData; + logger.debug('Executing hook (hookFunctionsPush)', { + executionId, + pushRef, + workflowId, + }); + + const { status } = fullRunData; + if (status === 'waiting') { + push.send({ type: 'executionWaiting', data: { executionId } }, pushRef); + } else { + const rawData = stringify(fullRunData.data); + push.send( + { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, + pushRef, + ); + } + }); + } + + private prepareExecutionDataForDbUpdate(parameters: { + runData: IRun; + workflowData: IWorkflowBase; + workflowStatusFinal: ExecutionStatus; + retryOf?: string; + }) { + const { runData, workflowData, workflowStatusFinal, retryOf } = parameters; + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const pristineWorkflowData: IWorkflowBase = pick(workflowData, [ + 'id', + 'name', + 'active', + 'createdAt', + 'updatedAt', + 'nodes', + 'connections', + 'settings', + 'staticData', + 'pinData', + ]); + + const fullExecutionData: UpdateExecutionPayload = { + data: runData.data, + mode: runData.mode, + finished: runData.finished ? runData.finished : false, + startedAt: runData.startedAt, + stoppedAt: runData.stoppedAt, + workflowData: pristineWorkflowData, + waitTill: runData.waitTill, + status: workflowStatusFinal, + workflowId: pristineWorkflowData.id, + }; + + if (retryOf !== undefined) { + fullExecutionData.retryOf = retryOf.toString(); + } + + const workflowId = workflowData.id; + if (isWorkflowIdValid(workflowId)) { + fullExecutionData.workflowId = workflowId; + } + + return fullExecutionData; + } + + private async updateExistingExecution(parameters: { + executionId: string; + workflowId: string; + executionData: Partial; + }) { + const { executionId, workflowId, executionData } = parameters; + // Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here + this.logger.debug(`Save execution data to database for execution ID ${executionId}`, { + executionId, + workflowId, + finished: executionData.finished, + stoppedAt: executionData.stoppedAt, + }); + + await this.executionRepository.updateExistingExecution(executionId, executionData); + + try { + const metadata = executionData.data?.resultData.metadata; + if (metadata) { + await this.executionMetadataService.save(executionId, metadata); + } + } catch (e) { + const error = ensureError(e); + this.errorReporter.error(error); + this.logger.error(`Failed to save metadata for execution ID ${executionId}`, { error }); + } + + if (executionData.finished === true && executionData.retryOf !== undefined) { + await this.executionRepository.updateExistingExecution(executionData.retryOf, { + retrySuccessId: executionId, + }); + } + } +} diff --git a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts b/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts index 2047c9e82e..936d1e53eb 100644 --- a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts +++ b/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts @@ -13,6 +13,7 @@ export async function saveExecutionProgress( executionData: IRunExecutionData, pushRef?: string, ) { + // TODO: calculate these only once for lifecycle hooks setup const saveSettings = toSaveSettings(workflowData.settings); if (!saveSettings.progress) return; diff --git a/packages/cli/src/execution-lifecycle-hooks/shared/shared-hook-functions.ts b/packages/cli/src/execution-lifecycle-hooks/shared/shared-hook-functions.ts index 4c91222126..6179ad1cd6 100644 --- a/packages/cli/src/execution-lifecycle-hooks/shared/shared-hook-functions.ts +++ b/packages/cli/src/execution-lifecycle-hooks/shared/shared-hook-functions.ts @@ -1,12 +1,4 @@ -import pick from 'lodash/pick'; -import { Logger } from 'n8n-core'; -import { ensureError, type ExecutionStatus, type IRun, type IWorkflowBase } from 'n8n-workflow'; -import { Container } from 'typedi'; - -import { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces'; -import { ExecutionMetadataService } from '@/services/execution-metadata.service'; -import { isWorkflowIdValid } from '@/utils'; +import type { ExecutionStatus, IRun } from 'n8n-workflow'; export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus { const workflowHasCrashed = runData.status === 'crashed'; @@ -23,85 +15,3 @@ export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus { if (runData.waitTill) workflowStatusFinal = 'waiting'; return workflowStatusFinal; } - -export function prepareExecutionDataForDbUpdate(parameters: { - runData: IRun; - workflowData: IWorkflowBase; - workflowStatusFinal: ExecutionStatus; - retryOf?: string; -}) { - const { runData, workflowData, workflowStatusFinal, retryOf } = parameters; - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const pristineWorkflowData: IWorkflowBase = pick(workflowData, [ - 'id', - 'name', - 'active', - 'createdAt', - 'updatedAt', - 'nodes', - 'connections', - 'settings', - 'staticData', - 'pinData', - ]); - - const fullExecutionData: UpdateExecutionPayload = { - data: runData.data, - mode: runData.mode, - finished: runData.finished ? runData.finished : false, - startedAt: runData.startedAt, - stoppedAt: runData.stoppedAt, - workflowData: pristineWorkflowData, - waitTill: runData.waitTill, - status: workflowStatusFinal, - workflowId: pristineWorkflowData.id, - }; - - if (retryOf !== undefined) { - fullExecutionData.retryOf = retryOf.toString(); - } - - const workflowId = workflowData.id; - if (isWorkflowIdValid(workflowId)) { - fullExecutionData.workflowId = workflowId; - } - - return fullExecutionData; -} - -export async function updateExistingExecution(parameters: { - executionId: string; - workflowId: string; - executionData: Partial; -}) { - const logger = Container.get(Logger); - const { executionId, workflowId, executionData } = parameters; - // Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here - logger.debug(`Save execution data to database for execution ID ${executionId}`, { - executionId, - workflowId, - finished: executionData.finished, - stoppedAt: executionData.stoppedAt, - }); - - await Container.get(ExecutionRepository).updateExistingExecution(executionId, executionData); - - try { - if (executionData.data?.resultData.metadata) { - await Container.get(ExecutionMetadataService).save( - executionId, - executionData.data.resultData.metadata, - ); - } - } catch (e) { - const error = ensureError(e); - logger.error(`Failed to save metadata for execution ID ${executionId}`, { error }); - } - - if (executionData.finished === true && executionData.retryOf !== undefined) { - await Container.get(ExecutionRepository).updateExistingExecution(executionData.retryOf, { - retrySuccessId: executionId, - }); - } -} diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index 0e6017a2bd..f4700ee9ec 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -10,6 +10,7 @@ import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import type { EventMessageTypes as EventMessage } from '@/eventbus/event-message-classes'; import { EventMessageNode } from '@/eventbus/event-message-classes/event-message-node'; +import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import { Push } from '@/push'; import { mockInstance } from '@test/mocking'; @@ -26,10 +27,12 @@ describe('ExecutionRecoveryService', () => { let executionRecoveryService: ExecutionRecoveryService; let executionRepository: ExecutionRepository; + let executionHooksFactory: ExecutionHooksFactory; beforeAll(async () => { await testDb.init(); executionRepository = Container.get(ExecutionRepository); + executionHooksFactory = Container.get(ExecutionHooksFactory); executionRecoveryService = new ExecutionRecoveryService( mock(), @@ -37,6 +40,7 @@ describe('ExecutionRecoveryService', () => { push, executionRepository, mock(), + executionHooksFactory, ); }); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index f307ce0677..eb7f16e882 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -9,9 +9,9 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { EventService } from '@/events/event.service'; +import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory'; import type { IExecutionResponse } from '@/interfaces'; import { Push } from '@/push'; -import { getWorkflowHooksMain } from '@/workflow-execute-additional-data'; // @TODO: Dependency cycle import type { EventMessageTypes } from '../eventbus/event-message-classes'; @@ -26,6 +26,7 @@ export class ExecutionRecoveryService { private readonly push: Push, private readonly executionRepository: ExecutionRepository, private readonly eventService: EventService, + private readonly executionHooksFactory: ExecutionHooksFactory, ) {} /** @@ -182,7 +183,7 @@ export class ExecutionRecoveryService { runData: execution, }); - const externalHooks = getWorkflowHooksMain( + const lifecycleHooks = this.executionHooksFactory.forExecutionOnMain( { userId: '', workflowData: execution.workflowData, @@ -204,6 +205,6 @@ export class ExecutionRecoveryService { status: execution.status, }; - await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]); + await lifecycleHooks.executeHook('workflowExecuteAfter', [run]); } } diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts index 73264e6382..897986c915 100644 --- a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -19,6 +19,7 @@ describe('JobProcessor', () => { mock(), mock(), mock(), + mock(), ); const result = await jobProcessor.processJob(mock()); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 5e760e40c1..bbbbfe0afe 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -8,6 +8,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory'; import { NodeTypes } from '@/node-types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; @@ -34,6 +35,7 @@ export class JobProcessor { private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, private readonly instanceSettings: InstanceSettings, + private readonly executionHooksFactory: ExecutionHooksFactory, ) { this.logger = this.logger.scoped('scaling'); } @@ -115,25 +117,23 @@ export class JobProcessor { executionTimeoutTimestamp, ); - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + additionalData.hooks = this.executionHooksFactory.forSubExecution( execution.mode, job.data.executionId, execution.workflowData, - { retryOf: execution.retryOf as string }, + { retryOf: execution.retryOf }, ); - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - const msg: RespondToWebhookMessage = { - kind: 'respond-to-webhook', - executionId, - response: this.encodeWebhookResponse(response), - workerId: this.instanceSettings.hostId, - }; + additionalData.hooks.addHook('sendResponse', async (response) => { + const msg: RespondToWebhookMessage = { + kind: 'respond-to-webhook', + executionId, + response: this.encodeWebhookResponse(response), + workerId: this.instanceSettings.hostId, + }; - await job.progress(msg); - }, - ]; + await job.progress(msg); + }); additionalData.executionId = executionId; diff --git a/packages/cli/src/__tests__/object-to-error.test.ts b/packages/cli/src/utils/__tests__/object-to-error.test.ts similarity index 94% rename from packages/cli/src/__tests__/object-to-error.test.ts rename to packages/cli/src/utils/__tests__/object-to-error.test.ts index 311f4dce55..c65676a426 100644 --- a/packages/cli/src/__tests__/object-to-error.test.ts +++ b/packages/cli/src/utils/__tests__/object-to-error.test.ts @@ -2,7 +2,7 @@ import { mock } from 'jest-mock-extended'; import type { INode } from 'n8n-workflow'; import { NodeOperationError, type Workflow } from 'n8n-workflow'; -import { objectToError } from '../workflow-execute-additional-data'; +import { objectToError } from '../object-to-error'; describe('objectToError', () => { describe('node error handling', () => { diff --git a/packages/cli/src/utils/object-to-error.ts b/packages/cli/src/utils/object-to-error.ts new file mode 100644 index 0000000000..ffb0cd8fb3 --- /dev/null +++ b/packages/cli/src/utils/object-to-error.ts @@ -0,0 +1,53 @@ +import { isObjectLiteral } from 'n8n-core'; +import { NodeOperationError } from 'n8n-workflow'; +import type { Workflow } from 'n8n-workflow'; + +export function objectToError(errorObject: unknown, workflow: Workflow): Error { + // TODO: Expand with other error types + if (errorObject instanceof Error) { + // If it's already an Error instance, return it as is. + return errorObject; + } else if ( + isObjectLiteral(errorObject) && + 'message' in errorObject && + typeof errorObject.message === 'string' + ) { + // If it's an object with a 'message' property, create a new Error instance. + let error: Error | undefined; + if ( + 'node' in errorObject && + isObjectLiteral(errorObject.node) && + typeof errorObject.node.name === 'string' + ) { + const node = workflow.getNode(errorObject.node.name); + + if (node) { + error = new NodeOperationError( + node, + errorObject as unknown as Error, + errorObject as object, + ); + } + } + + if (error === undefined) { + error = new Error(errorObject.message); + } + + if ('description' in errorObject) { + // @ts-expect-error Error descriptions are surfaced by the UI but + // not all backend errors account for this property yet. + error.description = errorObject.description as string; + } + + if ('stack' in errorObject) { + // If there's a 'stack' property, set it on the new Error instance. + error.stack = errorObject.stack as string; + } + + return error; + } else { + // If it's neither an Error nor an object with a 'message' property, create a generic Error. + return new Error('An error occurred'); + } +} diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 69395b4ec8..2d4e0d2c65 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -4,9 +4,8 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import type { PushMessage, PushType } from '@n8n/api-types'; import { GlobalConfig } from '@n8n/config'; -import { stringify } from 'flatted'; -import { ErrorReporter, Logger, WorkflowExecute, isObjectLiteral } from 'n8n-core'; -import { ApplicationError, NodeOperationError, Workflow, WorkflowHooks } from 'n8n-workflow'; +import { ErrorReporter, Logger } from 'n8n-core'; +import { ApplicationError } from 'n8n-workflow'; import type { IDataObject, IExecuteData, @@ -16,15 +15,11 @@ import type { INodeParameters, IRun, IRunExecutionData, - ITaskData, IWorkflowBase, IWorkflowExecuteAdditionalData, - IWorkflowExecuteHooks, - IWorkflowHooksOptionalParameters, IWorkflowSettings, WorkflowExecuteMode, ExecutionStatus, - ExecutionError, IExecuteFunctions, ITaskDataConnections, ExecuteWorkflowOptions, @@ -32,90 +27,27 @@ import type { EnvProviderState, ExecuteWorkflowData, RelatedExecution, + Workflow, } from 'n8n-workflow'; import { Container } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { CredentialsHelper } from '@/credentials-helper'; -import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map'; -import { ExternalHooks } from '@/external-hooks'; -import type { IWorkflowErrorData, UpdateExecutionPayload } from '@/interfaces'; -import { NodeTypes } from '@/node-types'; +import type { IWorkflowErrorData } from '@/interfaces'; import { Push } from '@/push'; -import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; -import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; +import { findSubworkflowStart } from '@/utils'; import * as WorkflowHelpers from '@/workflow-helpers'; import { WorkflowRepository } from './databases/repositories/workflow.repository'; import { EventService } from './events/event.service'; -import { restoreBinaryDataId } from './execution-lifecycle-hooks/restore-binary-data-id'; -import { saveExecutionProgress } from './execution-lifecycle-hooks/save-execution-progress'; -import { - determineFinalExecutionStatus, - prepareExecutionDataForDbUpdate, - updateExistingExecution, -} from './execution-lifecycle-hooks/shared/shared-hook-functions'; -import { toSaveSettings } from './execution-lifecycle-hooks/to-save-settings'; import { TaskManager } from './runners/task-managers/task-manager'; import { SecretsHelper } from './secrets-helpers.ee'; import { OwnershipService } from './services/ownership.service'; import { UrlService } from './services/url.service'; -import { SubworkflowPolicyChecker } from './subworkflows/subworkflow-policy-checker.service'; -import { PermissionChecker } from './user-management/permission-checker'; +import { WorkflowRunner } from './workflow-runner'; import { WorkflowExecutionService } from './workflows/workflow-execution.service'; -import { WorkflowStaticDataService } from './workflows/workflow-static-data.service'; - -export function objectToError(errorObject: unknown, workflow: Workflow): Error { - // TODO: Expand with other error types - if (errorObject instanceof Error) { - // If it's already an Error instance, return it as is. - return errorObject; - } else if ( - isObjectLiteral(errorObject) && - 'message' in errorObject && - typeof errorObject.message === 'string' - ) { - // If it's an object with a 'message' property, create a new Error instance. - let error: Error | undefined; - if ( - 'node' in errorObject && - isObjectLiteral(errorObject.node) && - typeof errorObject.node.name === 'string' - ) { - const node = workflow.getNode(errorObject.node.name); - - if (node) { - error = new NodeOperationError( - node, - errorObject as unknown as Error, - errorObject as object, - ); - } - } - - if (error === undefined) { - error = new Error(errorObject.message); - } - - if ('description' in errorObject) { - // @ts-expect-error Error descriptions are surfaced by the UI but - // not all backend errors account for this property yet. - error.description = errorObject.description as string; - } - - if ('stack' in errorObject) { - // If there's a 'stack' property, set it on the new Error instance. - error.stack = errorObject.stack as string; - } - - return error; - } else { - // If it's neither an Error nor an object with a 'message' property, create a generic Error. - return new Error('An error occurred'); - } -} /** * Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects @@ -238,434 +170,6 @@ export function executeErrorWorkflow( } } -/** - * Returns hook functions to push data to Editor-UI - * - */ -function hookFunctionsPush(): IWorkflowExecuteHooks { - const logger = Container.get(Logger); - const pushInstance = Container.get(Push); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { pushRef, executionId } = this; - // Push data to session which started workflow before each - // node which starts rendering - if (pushRef === undefined) { - return; - } - - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId, - pushRef, - workflowId: this.workflowData.id, - }); - - pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { - const { pushRef, executionId } = this; - // Push data to session which started workflow after each rendered node - if (pushRef === undefined) { - return; - } - - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId, - pushRef, - workflowId: this.workflowData.id, - }); - - pushInstance.send( - { type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, - pushRef, - ); - }, - ], - workflowExecuteBefore: [ - async function (this: WorkflowHooks, _workflow, data): Promise { - const { pushRef, executionId } = this; - const { id: workflowId, name: workflowName } = this.workflowData; - logger.debug('Executing hook (hookFunctionsPush)', { - executionId, - pushRef, - workflowId, - }); - // Push data to session which started the workflow - if (pushRef === undefined) { - return; - } - pushInstance.send( - { - type: 'executionStarted', - data: { - executionId, - mode: this.mode, - startedAt: new Date(), - retryOf: this.retryOf, - workflowId, - workflowName, - flattedRunData: data?.resultData.runData - ? stringify(data.resultData.runData) - : stringify({}), - }, - }, - pushRef, - ); - }, - ], - workflowExecuteAfter: [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - const { pushRef, executionId } = this; - if (pushRef === undefined) return; - - const { id: workflowId } = this.workflowData; - logger.debug('Executing hook (hookFunctionsPush)', { - executionId, - pushRef, - workflowId, - }); - - const { status } = fullRunData; - if (status === 'waiting') { - pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef); - } else { - const rawData = stringify(fullRunData.data); - pushInstance.send( - { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, - pushRef, - ); - } - }, - ], - }; -} - -export function hookFunctionsPreExecute(): IWorkflowExecuteHooks { - const externalHooks = Container.get(ExternalHooks); - return { - workflowExecuteBefore: [ - async function (this: WorkflowHooks, workflow: Workflow): Promise { - await externalHooks.run('workflow.preExecute', [workflow, this.mode]); - }, - ], - nodeExecuteAfter: [ - async function ( - this: WorkflowHooks, - nodeName: string, - data: ITaskData, - executionData: IRunExecutionData, - ): Promise { - await saveExecutionProgress( - this.workflowData, - this.executionId, - nodeName, - data, - executionData, - this.pushRef, - ); - }, - ], - }; -} - -/** - * Returns hook functions to save workflow execution and call error workflow - * - */ -function hookFunctionsSave(): IWorkflowExecuteHooks { - const logger = Container.get(Logger); - const workflowStatisticsService = Container.get(WorkflowStatisticsService); - const eventService = Container.get(EventService); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - workflowExecuteBefore: [], - workflowExecuteAfter: [ - async function ( - this: WorkflowHooks, - fullRunData: IRun, - newStaticData: IDataObject, - ): Promise { - logger.debug('Executing hook (hookFunctionsSave)', { - executionId: this.executionId, - workflowId: this.workflowData.id, - }); - - await restoreBinaryDataId(fullRunData, this.executionId, this.mode); - - const isManualMode = this.mode === 'manual'; - - try { - if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { - // Workflow is saved so update in database - try { - await Container.get(WorkflowStaticDataService).saveStaticDataById( - this.workflowData.id, - newStaticData, - ); - } catch (e) { - Container.get(ErrorReporter).error(e); - logger.error( - `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, - { executionId: this.executionId, workflowId: this.workflowData.id }, - ); - } - } - - const executionStatus = determineFinalExecutionStatus(fullRunData); - fullRunData.status = executionStatus; - - const saveSettings = toSaveSettings(this.workflowData.settings); - - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { - /** - * When manual executions are not being saved, we only soft-delete - * the execution so that the user can access its binary data - * while building their workflow. - * - * The manual execution and its binary data will be hard-deleted - * on the next pruning cycle after the grace period set by - * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. - */ - await Container.get(ExecutionRepository).softDelete(this.executionId); - - return; - } - - const shouldNotSave = - (executionStatus === 'success' && !saveSettings.success) || - (executionStatus !== 'success' && !saveSettings.error); - - if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - - return; - } - - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const fullExecutionData = prepareExecutionDataForDbUpdate({ - runData: fullRunData, - workflowData: this.workflowData, - workflowStatusFinal: executionStatus, - retryOf: this.retryOf, - }); - - // When going into the waiting state, store the pushRef in the execution-data - if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = this.pushRef; - } - - await updateExistingExecution({ - executionId: this.executionId, - workflowId: this.workflowData.id, - executionData: fullExecutionData, - }); - - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - } catch (error) { - Container.get(ErrorReporter).error(error); - logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, { - executionId: this.executionId, - workflowId: this.workflowData.id, - error, - }); - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - } finally { - workflowStatisticsService.emit('workflowExecutionCompleted', { - workflowData: this.workflowData, - fullRunData, - }); - } - }, - ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, - ], - }; -} - -/** - * Returns hook functions to save workflow execution and call error workflow - * for running with queues. Manual executions should never run on queues as - * they are always executed in the main process. - * - */ -function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { - const logger = Container.get(Logger); - const workflowStatisticsService = Container.get(WorkflowStatisticsService); - const eventService = Container.get(EventService); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - workflowExecuteBefore: [ - async function (): Promise { - const { executionId, workflowData } = this; - - eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); - }, - ], - workflowExecuteAfter: [ - async function ( - this: WorkflowHooks, - fullRunData: IRun, - newStaticData: IDataObject, - ): Promise { - logger.debug('Executing hook (hookFunctionsSaveWorker)', { - executionId: this.executionId, - workflowId: this.workflowData.id, - }); - try { - if (isWorkflowIdValid(this.workflowData.id) && newStaticData) { - // Workflow is saved so update in database - try { - await Container.get(WorkflowStaticDataService).saveStaticDataById( - this.workflowData.id, - newStaticData, - ); - } catch (e) { - Container.get(ErrorReporter).error(e); - logger.error( - `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, - { pushRef: this.pushRef, workflowId: this.workflowData.id }, - ); - } - } - - const workflowStatusFinal = determineFinalExecutionStatus(fullRunData); - fullRunData.status = workflowStatusFinal; - - if (workflowStatusFinal !== 'success' && workflowStatusFinal !== 'waiting') { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const fullExecutionData = prepareExecutionDataForDbUpdate({ - runData: fullRunData, - workflowData: this.workflowData, - workflowStatusFinal, - retryOf: this.retryOf, - }); - - await updateExistingExecution({ - executionId: this.executionId, - workflowId: this.workflowData.id, - executionData: fullExecutionData, - }); - } catch (error) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } finally { - workflowStatisticsService.emit('workflowExecutionCompleted', { - workflowData: this.workflowData, - fullRunData, - }); - } - }, - async function (this: WorkflowHooks, runData: IRun): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('workflow-post-execute', { - workflow, - executionId, - runData, - }); - }, - async function (this: WorkflowHooks, fullRunData: IRun) { - const externalHooks = Container.get(ExternalHooks); - if (externalHooks.exists('workflow.postExecute')) { - try { - await externalHooks.run('workflow.postExecute', [ - fullRunData, - this.workflowData, - this.executionId, - ]); - } catch (error) { - Container.get(ErrorReporter).error(error); - Container.get(Logger).error( - 'There was a problem running hook "workflow.postExecute"', - error, - ); - } - } - }, - ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, - ], - }; -} - export async function getRunData( workflowData: IWorkflowBase, inputData?: INodeExecutionData[], @@ -756,9 +260,9 @@ export async function getWorkflowData( } /** - * Executes the workflow with the given ID + * Executes a sub-workflow with the given ID */ -export async function executeWorkflow( +export async function executeSubWorkflow( workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, options: ExecuteWorkflowOptions, @@ -775,7 +279,7 @@ export async function executeWorkflow( const executionId = await activeExecutions.add(runData); - const executionPromise = startExecution( + const executionPromise = Container.get(WorkflowRunner).runSubWorkflow( additionalData, options, executionId, @@ -790,174 +294,6 @@ export async function executeWorkflow( return await executionPromise; } -async function startExecution( - additionalData: IWorkflowExecuteAdditionalData, - options: ExecuteWorkflowOptions, - executionId: string, - runData: IWorkflowExecutionDataProcess, - workflowData: IWorkflowBase, -): Promise { - const externalHooks = Container.get(ExternalHooks); - await externalHooks.init(); - - const nodeTypes = Container.get(NodeTypes); - const activeExecutions = Container.get(ActiveExecutions); - const eventService = Container.get(EventService); - const executionRepository = Container.get(ExecutionRepository); - - const workflowName = workflowData ? workflowData.name : undefined; - const workflow = new Workflow({ - id: workflowData.id, - name: workflowName, - nodes: workflowData.nodes, - connections: workflowData.connections, - active: workflowData.active, - nodeTypes, - staticData: workflowData.staticData, - settings: workflowData.settings, - }); - - /** - * A subworkflow execution in queue mode is not enqueued, but rather runs in the - * same worker process as the parent execution. Hence ensure the subworkflow - * execution is marked as started as well. - */ - await executionRepository.setRunning(executionId); - - Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData }); - - let data; - try { - await Container.get(PermissionChecker).check(workflowData.id, workflowData.nodes); - await Container.get(SubworkflowPolicyChecker).check( - workflow, - options.parentWorkflowId, - options.node, - additionalData.userId, - ); - - // Create new additionalData to have different workflow loaded and to call - // different webhooks - const additionalDataIntegrated = await getBase(); - additionalDataIntegrated.hooks = getWorkflowHooksIntegrated( - runData.executionMode, - executionId, - workflowData, - ); - additionalDataIntegrated.executionId = executionId; - additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager; - - // Make sure we pass on the original executeWorkflow function we received - // This one already contains changes to talk to parent process - // and get executionID from `activeExecutions` running on main process - additionalDataIntegrated.executeWorkflow = additionalData.executeWorkflow; - - let subworkflowTimeout = additionalData.executionTimeoutTimestamp; - const workflowSettings = workflowData.settings; - if (workflowSettings?.executionTimeout !== undefined && workflowSettings.executionTimeout > 0) { - // We might have received a max timeout timestamp from the parent workflow - // If we did, then we get the minimum time between the two timeouts - // If no timeout was given from the parent, then we use our timeout. - subworkflowTimeout = Math.min( - additionalData.executionTimeoutTimestamp || Number.MAX_SAFE_INTEGER, - Date.now() + workflowSettings.executionTimeout * 1000, - ); - } - - additionalDataIntegrated.executionTimeoutTimestamp = subworkflowTimeout; - - const runExecutionData = runData.executionData as IRunExecutionData; - - // Execute the workflow - const workflowExecute = new WorkflowExecute( - additionalDataIntegrated, - runData.executionMode, - runExecutionData, - ); - const execution = workflowExecute.processRunExecutionData(workflow); - activeExecutions.attachWorkflowExecution(executionId, execution); - data = await execution; - } catch (error) { - const executionError = error ? (error as ExecutionError) : undefined; - const fullRunData: IRun = { - data: { - resultData: { - error: executionError, - runData: {}, - }, - }, - finished: false, - mode: 'integrated', - startedAt: new Date(), - stoppedAt: new Date(), - status: 'error', - }; - // When failing, we might not have finished the execution - // Therefore, database might not contain finished errors. - // Force an update to db as there should be no harm doing this - - const fullExecutionData: UpdateExecutionPayload = { - data: fullRunData.data, - mode: fullRunData.mode, - finished: fullRunData.finished ? fullRunData.finished : false, - startedAt: fullRunData.startedAt, - stoppedAt: fullRunData.stoppedAt, - status: fullRunData.status, - workflowData, - workflowId: workflowData.id, - }; - if (workflowData.id) { - fullExecutionData.workflowId = workflowData.id; - } - - activeExecutions.finalizeExecution(executionId, fullRunData); - - await executionRepository.updateExistingExecution(executionId, fullExecutionData); - throw objectToError( - { - ...executionError, - stack: executionError?.stack, - message: executionError?.message, - }, - workflow, - ); - } - - await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); - - eventService.emit('workflow-post-execute', { - workflow: workflowData, - executionId, - userId: additionalData.userId, - runData: data, - }); - - // subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here - if (data.finished === true || data.status === 'waiting') { - // Workflow did finish successfully - - activeExecutions.finalizeExecution(executionId, data); - const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); - return { - executionId, - data: returnData!.data!.main, - waitTill: data.waitTill, - }; - } - activeExecutions.finalizeExecution(executionId, data); - - // Workflow did fail - const { error } = data.data.resultData; - - throw objectToError( - { - ...error, - stack: error?.stack, - }, - workflow, - ); -} - export function setExecutionStatus(status: ExecutionStatus) { const logger = Container.get(Logger); if (this.executionId === undefined) { @@ -1002,7 +338,7 @@ export async function getBase( return { credentialsHelper: Container.get(CredentialsHelper), - executeWorkflow, + executeSubWorkflow, restApiUrl: urlBaseWebhook + globalConfig.endpoints.rest, instanceBaseUrl: urlBaseWebhook, formWaitingBaseUrl: urlBaseWebhook + globalConfig.endpoints.formWaiting, @@ -1056,127 +392,3 @@ export async function getBase( eventService.emit(eventName, payload), }; } - -/** - * Returns WorkflowHooks instance for running integrated workflows - * (Workflows which get started inside of another workflow) - */ -function getWorkflowHooksIntegrated( - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, -): WorkflowHooks { - const hookFunctions = hookFunctionsSave(); - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); -} - -/** - * Returns WorkflowHooks instance for running integrated workflows - * (Workflows which get started inside of another workflow) - */ -export function getWorkflowHooksWorkerExecuter( - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, -): WorkflowHooks { - optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsSaveWorker(); - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); -} - -/** - * Returns WorkflowHooks instance for main process if workflow runs via worker - */ -export function getWorkflowHooksWorkerMain( - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, -): WorkflowHooks { - optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsPreExecute(); - - // TODO: why are workers pushing to frontend? - // TODO: simplifying this for now to just leave the bare minimum hooks - - // const hookFunctions = hookFunctionsPush(); - // const preExecuteFunctions = hookFunctionsPreExecute(); - // for (const key of Object.keys(preExecuteFunctions)) { - // if (hookFunctions[key] === undefined) { - // hookFunctions[key] = []; - // } - // hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); - // } - - // When running with worker mode, main process executes - // Only workflowExecuteBefore + workflowExecuteAfter - // So to avoid confusion, we are removing other hooks. - hookFunctions.nodeExecuteBefore = []; - hookFunctions.nodeExecuteAfter = []; - hookFunctions.workflowExecuteAfter = [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - // Don't delete executions before they are finished - if (!fullRunData.finished) return; - - const executionStatus = determineFinalExecutionStatus(fullRunData); - fullRunData.status = executionStatus; - - const saveSettings = toSaveSettings(this.workflowData.settings); - - const shouldNotSave = - (executionStatus === 'success' && !saveSettings.success) || - (executionStatus !== 'success' && !saveSettings.error); - - if (shouldNotSave) { - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - } - }, - ]; - - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); -} - -/** - * Returns WorkflowHooks instance for running the main workflow - * - */ -export function getWorkflowHooksMain( - data: IWorkflowExecutionDataProcess, - executionId: string, -): WorkflowHooks { - const hookFunctions = hookFunctionsSave(); - const pushFunctions = hookFunctionsPush(); - for (const key of Object.keys(pushFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], pushFunctions[key]); - } - - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - - if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = []; - if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = []; - - return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { - pushRef: data.pushRef, - retryOf: data.retryOf as string, - }); -} diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 30cd50d6f0..1ab2e88d17 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -2,6 +2,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ +import type { ExecutionHooks } from 'n8n-core'; import { ErrorReporter, InstanceSettings, Logger, WorkflowExecute } from 'n8n-core'; import type { ExecutionError, @@ -10,8 +11,11 @@ import type { IPinData, IRun, WorkflowExecuteMode, - WorkflowHooks, IWorkflowExecutionDataProcess, + IWorkflowExecuteAdditionalData, + ExecuteWorkflowOptions, + IWorkflowBase, + ExecuteWorkflowData, } from 'n8n-workflow'; import { ExecutionCancelledError, Workflow } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; @@ -20,19 +24,22 @@ import { Container, Service } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; +import { EventService } from '@/events/event.service'; +import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory'; import { ExternalHooks } from '@/external-hooks'; +import type { UpdateExecutionPayload } from '@/interfaces'; +import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import type { ScalingService } from '@/scaling/scaling.service'; import type { Job, JobData } from '@/scaling/scaling.types'; +import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; import { PermissionChecker } from '@/user-management/permission-checker'; +import { objectToError } from '@/utils/object-to-error'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; -import { generateFailedExecutionFromError } from '@/workflow-helpers'; +import { generateFailedExecutionFromError, getDataLastExecutedNodeData } from '@/workflow-helpers'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; -import { ExecutionNotFoundError } from './errors/execution-not-found-error'; -import { EventService } from './events/event.service'; -import { ManualExecutionService } from './manual-execution.service'; - @Service() export class WorkflowRunner { private scalingService: ScalingService; @@ -48,6 +55,7 @@ export class WorkflowRunner { private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly nodeTypes: NodeTypes, private readonly permissionChecker: PermissionChecker, + private readonly subworkflowPolicyChecker: SubworkflowPolicyChecker, private readonly eventService: EventService, private readonly instanceSettings: InstanceSettings, private readonly manualExecutionService: ManualExecutionService, @@ -59,7 +67,7 @@ export class WorkflowRunner { startedAt: Date, executionMode: WorkflowExecuteMode, executionId: string, - hooks?: WorkflowHooks, + hooks?: ExecutionHooks, ) { // This means the execution was probably cancelled and has already // been cleaned up. @@ -109,9 +117,7 @@ export class WorkflowRunner { // set the execution to failed. this.activeExecutions.finalizeExecution(executionId, fullRunData); - if (hooks) { - await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]); - } + await hooks?.executeHook('workflowExecuteAfter', [fullRunData]); } /** Run the workflow @@ -131,14 +137,12 @@ export class WorkflowRunner { try { await this.permissionChecker.check(workflowId, nodes); } catch (error) { + const executionHooksFactory = Container.get(ExecutionHooksFactory); // Create a failed execution with the data for the node, save it and abort execution const runData = generateFailedExecutionFromError(data.executionMode, error, error.node); - const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); - await workflowHooks.executeHookFunctions('workflowExecuteBefore', [ - undefined, - data.executionData, - ]); - await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); + const hooks = executionHooksFactory.forExecutionOnMain(data, executionId); + await hooks.executeHook('workflowExecuteBefore', [undefined, data.executionData]); + await hooks.executeHook('workflowExecuteAfter', [runData]); responsePromise?.reject(error); this.activeExecutions.finalizeExecution(executionId); return executionId; @@ -258,13 +262,12 @@ export class WorkflowRunner { await this.executionRepository.setRunning(executionId); // write try { - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); + const executionHooksFactory = Container.get(ExecutionHooksFactory); + additionalData.hooks = executionHooksFactory.forExecutionOnMain(data, executionId); - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - this.activeExecutions.resolveResponsePromise(executionId, response); - }, - ]; + additionalData.hooks.addHook('sendResponse', async (response) => { + this.activeExecutions.resolveResponsePromise(executionId, response); + }); additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ executionId, @@ -351,14 +354,15 @@ export class WorkflowRunner { this.scalingService = Container.get(ScalingService); } + const executionHooksFactory = Container.get(ExecutionHooksFactory); // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds. // Check if they get retried by default and how often. let job: Job; - let hooks: WorkflowHooks; + let lifecycleHooks: ExecutionHooks; try { job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); - hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain( + lifecycleHooks = executionHooksFactory.forExecutionOnWorker( data.executionMode, executionId, data.workflowData, @@ -367,11 +371,11 @@ export class WorkflowRunner { // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. - await hooks.executeHookFunctions('workflowExecuteBefore', [undefined, data.executionData]); + await lifecycleHooks.executeHook('workflowExecuteBefore', [undefined, data.executionData]); } catch (error) { - // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the + // We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the // "workflowExecuteAfter" which we require. - const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + const hooks = executionHooksFactory.forExecutionOnWorker( data.executionMode, executionId, data.workflowData, @@ -387,9 +391,9 @@ export class WorkflowRunner { onCancel(async () => { await this.scalingService.stopJob(job); - // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the + // We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the // "workflowExecuteAfter" which we require. - const hooksWorker = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + const hooksWorker = executionHooksFactory.forExecutionOnWorker( data.executionMode, executionId, data.workflowData, @@ -405,9 +409,9 @@ export class WorkflowRunner { try { await job.finished(); } catch (error) { - // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the + // We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the // "workflowExecuteAfter" which we require. - const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + const hooks = executionHooksFactory.forExecutionOnWorker( data.executionMode, executionId, data.workflowData, @@ -440,7 +444,7 @@ export class WorkflowRunner { // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed. - await hooks.executeHookFunctions('workflowExecuteAfter', [runData]); + await lifecycleHooks.executeHook('workflowExecuteAfter', [runData]); resolve(runData); }, @@ -454,4 +458,172 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); } + + async runSubWorkflow( + additionalData: IWorkflowExecuteAdditionalData, + options: ExecuteWorkflowOptions, + executionId: string, + runData: IWorkflowExecutionDataProcess, + workflowData: IWorkflowBase, + ): Promise { + const { activeExecutions, externalHooks, eventService, executionRepository, nodeTypes } = this; + + await externalHooks.init(); + + const workflowName = workflowData ? workflowData.name : undefined; + const workflow = new Workflow({ + id: workflowData.id, + name: workflowName, + nodes: workflowData.nodes, + connections: workflowData.connections, + active: workflowData.active, + nodeTypes, + staticData: workflowData.staticData, + settings: workflowData.settings, + }); + + /** + * A subworkflow execution in queue mode is not enqueued, but rather runs in the + * same worker process as the parent execution. Hence ensure the subworkflow + * execution is marked as started as well. + */ + await executionRepository.setRunning(executionId); + + eventService.emit('workflow-pre-execute', { executionId, data: runData }); + + let data; + try { + await this.permissionChecker.check(workflowData.id, workflowData.nodes); + await this.subworkflowPolicyChecker.check( + workflow, + options.parentWorkflowId, + options.node, + additionalData.userId, + ); + + const executionHooksFactory = Container.get(ExecutionHooksFactory); + + // Create new additionalData to have different workflow loaded and to call + // different webhooks + const additionalDataIntegrated = await WorkflowExecuteAdditionalData.getBase(); + additionalDataIntegrated.hooks = executionHooksFactory.forSubExecution( + runData.executionMode, + executionId, + workflowData, + { pushRef: runData.pushRef, retryOf: runData.retryOf }, + ); + additionalDataIntegrated.executionId = executionId; + additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager; + + // Make sure we pass on the original executeWorkflow function we received + // This one already contains changes to talk to parent process + // and get executionID from `activeExecutions` running on main process + additionalDataIntegrated.executeSubWorkflow = additionalData.executeSubWorkflow; + + let subworkflowTimeout = additionalData.executionTimeoutTimestamp; + const workflowSettings = workflowData.settings; + if ( + workflowSettings?.executionTimeout !== undefined && + workflowSettings.executionTimeout > 0 + ) { + // We might have received a max timeout timestamp from the parent workflow + // If we did, then we get the minimum time between the two timeouts + // If no timeout was given from the parent, then we use our timeout. + subworkflowTimeout = Math.min( + additionalData.executionTimeoutTimestamp || Number.MAX_SAFE_INTEGER, + Date.now() + workflowSettings.executionTimeout * 1000, + ); + } + + additionalDataIntegrated.executionTimeoutTimestamp = subworkflowTimeout; + + // Execute the workflow + const workflowExecute = new WorkflowExecute( + additionalDataIntegrated, + runData.executionMode, + runData.executionData, + ); + const execution = workflowExecute.processRunExecutionData(workflow); + activeExecutions.attachWorkflowExecution(executionId, execution); + data = await execution; + } catch (error) { + const executionError = error ? (error as ExecutionError) : undefined; + const fullRunData: IRun = { + data: { + resultData: { + error: executionError, + runData: {}, + }, + }, + finished: false, + mode: 'integrated', + startedAt: new Date(), + stoppedAt: new Date(), + status: 'error', + }; + // When failing, we might not have finished the execution + // Therefore, database might not contain finished errors. + // Force an update to db as there should be no harm doing this + + const fullExecutionData: UpdateExecutionPayload = { + data: fullRunData.data, + mode: fullRunData.mode, + finished: fullRunData.finished ? fullRunData.finished : false, + startedAt: fullRunData.startedAt, + stoppedAt: fullRunData.stoppedAt, + status: fullRunData.status, + workflowData, + workflowId: workflowData.id, + }; + if (workflowData.id) { + fullExecutionData.workflowId = workflowData.id; + } + + activeExecutions.finalizeExecution(executionId, fullRunData); + + await executionRepository.updateExistingExecution(executionId, fullExecutionData); + throw objectToError( + { + ...executionError, + stack: executionError?.stack, + message: executionError?.message, + }, + workflow, + ); + } + + await this.externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); + + eventService.emit('workflow-post-execute', { + workflow: workflowData, + executionId, + userId: additionalData.userId, + runData: data, + }); + + // subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here + if (data.finished === true || data.status === 'waiting') { + // Workflow did finish successfully + + activeExecutions.finalizeExecution(executionId, data); + const returnData = getDataLastExecutedNodeData(data); + return { + executionId, + data: returnData!.data!.main, + waitTill: data.waitTill, + }; + } + activeExecutions.finalizeExecution(executionId, data); + + // Workflow did fail + const { error } = data.data.resultData; + + throw objectToError( + { + ...error, + stack: error?.stack, + }, + workflow, + ); + } } diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index 2963e46185..3d78bf66c0 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -4,6 +4,8 @@ import type { ValidationResult, } from 'n8n-workflow'; +import type { ExecutionHooks } from '@/execution-hooks'; + export type Class = new (...args: A) => T; export interface IResponseError extends Error { @@ -35,4 +37,10 @@ export namespace n8n { } } +declare module 'n8n-workflow' { + interface IWorkflowExecuteAdditionalData { + hooks?: ExecutionHooks; + } +} + export type ExtendedValidationResult = ValidationResult & { fieldName?: string }; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 3a0ebf22f9..cffb3f1e8c 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -773,7 +773,7 @@ export async function proxyRequestToAxios( } else if (body === '') { body = axiosConfig.responseType === 'arraybuffer' ? Buffer.alloc(0) : undefined; } - await additionalData?.hooks?.executeHookFunctions('nodeFetchedData', [workflow?.id, node]); + await additionalData?.hooks?.executeHook('nodeFetchedData', [workflow?.id, node]); return configObject.resolveWithFullResponse ? { body, diff --git a/packages/core/src/TriggersAndPollers.ts b/packages/core/src/TriggersAndPollers.ts index b77926e136..c36a3a2899 100644 --- a/packages/core/src/TriggersAndPollers.ts +++ b/packages/core/src/TriggersAndPollers.ts @@ -46,6 +46,7 @@ export class TriggersAndPollers { // Add the manual trigger response which resolves when the first time data got emitted triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { + const hooks = additionalData.hooks!; triggerFunctions.emit = ( (resolveEmit) => ( @@ -53,19 +54,12 @@ export class TriggersAndPollers { responsePromise?: IDeferredPromise, donePromise?: IDeferredPromise, ) => { - additionalData.hooks!.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - if (responsePromise) { - responsePromise.resolve(response); - } - }, - ]; - + if (responsePromise) { + hooks.addHook('sendResponse', async (response) => responsePromise.resolve(response)); + } if (donePromise) { - additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift( - async (runData: IRun): Promise => { - return donePromise.resolve(runData); - }, + hooks.addHook('workflowExecuteAfter', async (runData) => + donePromise.resolve(runData), ); } @@ -75,13 +69,9 @@ export class TriggersAndPollers { triggerFunctions.emitError = ( (rejectEmit) => (error: Error, responsePromise?: IDeferredPromise) => { - additionalData.hooks!.hookFunctions.sendResponse = [ - async (): Promise => { - if (responsePromise) { - responsePromise.reject(error); - } - }, - ]; + if (responsePromise) { + hooks.addHook('sendResponse', async () => responsePromise.reject(error)); + } rejectEmit(error); } diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index f340c8da67..798b80a49d 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -404,19 +404,6 @@ export class WorkflowExecute { return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); } - /** - * Executes the hook with the given name - * - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - async executeHook(hookName: string, parameters: any[]): Promise { - if (this.additionalData.hooks === undefined) { - return; - } - - return await this.additionalData.hooks.executeHookFunctions(hookName, parameters); - } - moveNodeMetadata(): void { const metadata = get(this.runExecutionData, 'executionData.metadata'); @@ -1234,14 +1221,17 @@ export class WorkflowExecute { this.status = 'canceled'; this.abortController.abort(); const fullRunData = this.getFullRunData(startedAt); - void this.executeHook('workflowExecuteAfter', [fullRunData]); + void this.additionalData.hooks?.executeHook('workflowExecuteAfter', [fullRunData]); }); // eslint-disable-next-line complexity const returnPromise = (async () => { try { if (!this.additionalData.restartExecutionId) { - await this.executeHook('workflowExecuteBefore', [workflow, this.runExecutionData]); + await this.additionalData.hooks?.executeHook('workflowExecuteBefore', [ + workflow, + this.runExecutionData, + ]); } } catch (error) { const e = error as unknown as ExecutionBaseError; @@ -1325,7 +1315,7 @@ export class WorkflowExecute { node: executionNode.name, workflowId: workflow.id, }); - await this.executeHook('nodeExecuteBefore', [executionNode.name]); + await this.additionalData.hooks?.executeHook('nodeExecuteBefore', [executionNode.name]); // Get the index of the current run runIndex = 0; @@ -1774,7 +1764,7 @@ export class WorkflowExecute { this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); // Only execute the nodeExecuteAfter hook if the node did not get aborted if (!this.isCancelled) { - await this.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1816,7 +1806,7 @@ export class WorkflowExecute { this.runExecutionData.resultData.runData[executionNode.name].push(taskData); if (this.runExecutionData.waitTill) { - await this.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1835,7 +1825,7 @@ export class WorkflowExecute { ) { // Before stopping, make sure we are executing hooks so // That frontend is notified for example for manual executions. - await this.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1945,7 +1935,7 @@ export class WorkflowExecute { // Execute hooks now to make sure that all hooks are executed properly // Await is needed to make sure that we don't fall into concurrency problems // When saving node execution data - await this.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -2148,12 +2138,14 @@ export class WorkflowExecute { this.moveNodeMetadata(); - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch( - // eslint-disable-next-line @typescript-eslint/no-shadow - (error) => { - console.error('There was a problem running hook "workflowExecuteAfter"', error); - }, - ); + await this.additionalData.hooks + ?.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]) + .catch( + // eslint-disable-next-line @typescript-eslint/no-shadow + (error) => { + console.error('There was a problem running hook "workflowExecuteAfter"', error); + }, + ); if (closeFunction) { try { @@ -2220,7 +2212,10 @@ export class WorkflowExecute { this.moveNodeMetadata(); // Prevent from running the hook if the error is an abort error as it was already handled if (!this.isCancelled) { - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await this.additionalData.hooks?.executeHook('workflowExecuteAfter', [ + fullRunData, + newStaticData, + ]); } if (closeFunction) { diff --git a/packages/core/src/__tests__/execution-hooks.test.ts b/packages/core/src/__tests__/execution-hooks.test.ts new file mode 100644 index 0000000000..77d90ad2f0 --- /dev/null +++ b/packages/core/src/__tests__/execution-hooks.test.ts @@ -0,0 +1,114 @@ +import { mock } from 'jest-mock-extended'; +import type { + IDataObject, + IExecuteResponsePromiseData, + INode, + IRun, + IRunExecutionData, + ITaskData, + IWorkflowBase, + Workflow, +} from 'n8n-workflow'; + +import type { ExecutionHookName, RegisteredHooks } from '@/execution-hooks'; +import { ExecutionHooks } from '@/execution-hooks'; + +describe('ExecutionHooks', () => { + const executionId = '123'; + const pushRef = 'test-ref'; + const retryOf = 'test-retry'; + const workflowData = mock(); + + let hooks: ExecutionHooks; + beforeEach(() => { + jest.clearAllMocks(); + hooks = new ExecutionHooks('internal', executionId, workflowData, { + pushRef, + retryOf, + }); + }); + + describe('constructor()', () => { + it('should initialize with correct properties', () => { + expect(hooks.mode).toBe('internal'); + expect(hooks.executionId).toBe(executionId); + expect(hooks.workflowData).toBe(workflowData); + expect(hooks.pushRef).toBe(pushRef); + expect(hooks.retryOf).toBe(retryOf); + // @ts-expect-error private property + expect(hooks.registered).toEqual({ + nodeExecuteAfter: [], + nodeExecuteBefore: [], + nodeFetchedData: [], + sendResponse: [], + workflowExecuteAfter: [], + workflowExecuteBefore: [], + }); + }); + }); + + describe('addHook()', () => { + const hooksHandler = + mock<{ + [K in keyof RegisteredHooks]: RegisteredHooks[K][number]; + }>(); + + const testCases: Array<{ hook: ExecutionHookName; args: unknown[] }> = [ + { hook: 'nodeExecuteBefore', args: ['testNode'] }, + { + hook: 'nodeExecuteAfter', + args: ['testNode', mock(), mock()], + }, + { hook: 'workflowExecuteBefore', args: [mock(), mock()] }, + { hook: 'workflowExecuteAfter', args: [mock(), mock()] }, + { hook: 'sendResponse', args: [mock()] }, + { hook: 'nodeFetchedData', args: ['workflow123', mock()] }, + ]; + + test.each(testCases)('should add and process $hook hooks', async ({ hook, args }) => { + hooks.addHook(hook, hooksHandler[hook]); + await hooks.executeHook(hook, args); + expect(hooksHandler[hook]).toHaveBeenCalledWith(...args); + }); + }); + + describe('executeHook()', () => { + it('should execute multiple hooks in order', async () => { + const executionOrder: string[] = []; + const hook1 = jest.fn().mockImplementation(async () => { + executionOrder.push('hook1'); + }); + const hook2 = jest.fn().mockImplementation(async () => { + executionOrder.push('hook2'); + }); + + hooks.addHook('nodeExecuteBefore', hook1, hook2); + await hooks.executeHook('nodeExecuteBefore', ['testNode']); + + expect(executionOrder).toEqual(['hook1', 'hook2']); + expect(hook1).toHaveBeenCalled(); + expect(hook2).toHaveBeenCalled(); + }); + + it('should maintain correct "this" context', async () => { + const hook = jest.fn().mockImplementation(async function (this: ExecutionHooks) { + expect(this.executionId).toBe(executionId); + expect(this.mode).toBe('internal'); + }); + + hooks.addHook('nodeExecuteBefore', hook); + await hooks.executeHook('nodeExecuteBefore', ['testNode']); + + expect(hook).toHaveBeenCalled(); + }); + + it('should handle errors in hooks', async () => { + const errorHook = jest.fn().mockRejectedValue(new Error('Hook failed')); + hooks.addHook('nodeExecuteBefore', errorHook); + + await expect(hooks.executeHook('nodeExecuteBefore', ['testNode'])).rejects.toThrow( + 'Hook failed', + ); + }); + }); +}); diff --git a/packages/core/src/execution-hooks.ts b/packages/core/src/execution-hooks.ts new file mode 100644 index 0000000000..314ac34185 --- /dev/null +++ b/packages/core/src/execution-hooks.ts @@ -0,0 +1,119 @@ +import type { + IDataObject, + IExecuteResponsePromiseData, + INode, + IRun, + IRunExecutionData, + ITaskData, + IWorkflowBase, + Workflow, + WorkflowExecuteMode, +} from 'n8n-workflow'; + +/** This defines the types of hooks that can be executed at different stages of a workflow execution. */ +export interface RegisteredHooks { + /** Executed before a node starts executing */ + nodeExecuteBefore: Array<(this: ExecutionHooks, nodeName: string) => Promise>; + + /** Executed after a node finishes executing */ + nodeExecuteAfter: Array< + ( + this: ExecutionHooks, + nodeName: string, + data: ITaskData, + executionData: IRunExecutionData, + ) => Promise + >; + + /** Executed before workflow execution starts */ + workflowExecuteBefore: Array< + (this: ExecutionHooks, workflow?: Workflow, data?: IRunExecutionData) => Promise + >; + + /** Executed after workflow execution completes */ + workflowExecuteAfter: Array< + (this: ExecutionHooks, data: IRun, newStaticData: IDataObject) => Promise + >; + + /** Used by trigger and webhook nodes to respond back to the request */ + sendResponse: Array< + (this: ExecutionHooks, response: IExecuteResponsePromiseData) => Promise + >; + + /** Executed when a node fetches data */ + nodeFetchedData: Array<(this: ExecutionHooks, workflowId: string, node: INode) => Promise>; +} + +export type ExecutionHookName = keyof RegisteredHooks; + +export interface ExecutionHooksOptionalParameters { + retryOf?: string; + pushRef?: string; +} + +/** + * This class serves as a container for execution lifecycle hooks that get triggered during different stages of an execution. + * It manages and executes callback functions registered for specific execution events. + * + * Common use cases include: + * - Saving execution progress to database + * - Pushing execution status updates to the frontend UI + * - Recording workflow statistics + * - Running external hooks for execution events + * - Error and Cancellation handling and cleanup + * + * Example usage: + * ```typescript + * const hooks = new ExecutionHooks(mode, executionId, workflowData); + * hooks.add('workflowExecuteAfter, async function(fullRunData) { + * await saveToDatabase(this.executionId, fullRunData); + *}); + * ``` + */ +export class ExecutionHooks { + pushRef?: string; + + retryOf?: string; + + private readonly registered: RegisteredHooks = { + nodeExecuteAfter: [], + nodeExecuteBefore: [], + nodeFetchedData: [], + sendResponse: [], + workflowExecuteAfter: [], + workflowExecuteBefore: [], + }; + + constructor( + readonly mode: WorkflowExecuteMode, + readonly executionId: string, + readonly workflowData: IWorkflowBase, + optionalParameters: ExecutionHooksOptionalParameters = {}, + ) { + this.pushRef = optionalParameters.pushRef; + // retryOf might be `null` from TypeORM + this.retryOf = optionalParameters.retryOf ?? undefined; + } + + async executeHook< + Hook extends keyof RegisteredHooks, + Params extends unknown[] = Parameters[number]>, + >(hookName: Hook, parameters: Params) { + const hooks = this.registered[hookName]; + for (const hookFunction of hooks) { + const typedHookFunction = hookFunction as unknown as ( + this: ExecutionHooks, + ...args: Params + ) => Promise; + await typedHookFunction.apply(this, parameters); + } + } + + addHook( + hookName: Hook, + ...hookFunctions: Array + ): void { + // @ts-expect-error FIX THIS + this.registered[hookName].push(...hookFunctions); + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 7abbd9ad9a..b9fe98459b 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -12,6 +12,7 @@ export * from './DirectoryLoader'; export * from './Interfaces'; export { InstanceSettings, InstanceType } from './InstanceSettings'; export { Logger } from './logging/logger'; +export { ExecutionHooks, ExecutionHooksOptionalParameters } from './execution-hooks'; export * from './NodeExecuteFunctions'; export * from './RoutingNode'; export * from './WorkflowExecute'; diff --git a/packages/core/src/node-execution-context/__tests__/shared-tests.ts b/packages/core/src/node-execution-context/__tests__/shared-tests.ts index 9992507bdd..1558a82f1f 100644 --- a/packages/core/src/node-execution-context/__tests__/shared-tests.ts +++ b/packages/core/src/node-execution-context/__tests__/shared-tests.ts @@ -210,7 +210,7 @@ export const describeCommonTests = ( }; it('should execute workflow and return data', async () => { - additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData); + additionalData.executeSubWorkflow.mockResolvedValue(executeWorkflowData); binaryDataService.duplicateBinaryData.mockResolvedValue(data); const result = await context.executeWorkflow(workflowInfo, undefined, undefined, { @@ -227,7 +227,7 @@ export const describeCommonTests = ( it('should put execution to wait if waitTill is returned', async () => { const waitTill = new Date(); - additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill }); + additionalData.executeSubWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill }); binaryDataService.duplicateBinaryData.mockResolvedValue(data); const result = await context.executeWorkflow(workflowInfo, undefined, undefined, { diff --git a/packages/core/src/node-execution-context/base-execute-context.ts b/packages/core/src/node-execution-context/base-execute-context.ts index 24b9e89301..b80dcff165 100644 --- a/packages/core/src/node-execution-context/base-execute-context.ts +++ b/packages/core/src/node-execution-context/base-execute-context.ts @@ -117,7 +117,7 @@ export class BaseExecuteContext extends NodeExecutionContext { parentExecution?: RelatedExecution; }, ): Promise { - const result = await this.additionalData.executeWorkflow(workflowInfo, this.additionalData, { + const result = await this.additionalData.executeSubWorkflow(workflowInfo, this.additionalData, { ...options, parentWorkflowId: this.workflow.id, inputData, diff --git a/packages/core/src/node-execution-context/execute-context.ts b/packages/core/src/node-execution-context/execute-context.ts index f3e32608f3..4c32345cb1 100644 --- a/packages/core/src/node-execution-context/execute-context.ts +++ b/packages/core/src/node-execution-context/execute-context.ts @@ -192,7 +192,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti } async sendResponse(response: IExecuteResponsePromiseData): Promise { - await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]); + await this.additionalData.hooks?.executeHook('sendResponse', [response]); } /** @deprecated use ISupplyDataFunctions.addInputData */ diff --git a/packages/core/src/node-execution-context/supply-data-context.ts b/packages/core/src/node-execution-context/supply-data-context.ts index be1f63b56c..d92b788a2c 100644 --- a/packages/core/src/node-execution-context/supply-data-context.ts +++ b/packages/core/src/node-execution-context/supply-data-context.ts @@ -256,12 +256,12 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData } runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData; - await additionalData.hooks?.executeHookFunctions('nodeExecuteBefore', [nodeName]); + await additionalData.hooks?.executeHook('nodeExecuteBefore', [nodeName]); } else { // Outputs taskData.executionTime = new Date().getTime() - taskData.startTime; - await additionalData.hooks?.executeHookFunctions('nodeExecuteAfter', [ + await additionalData.hooks?.executeHook('nodeExecuteAfter', [ nodeName, taskData, this.runExecutionData, diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts index b1b6e96577..f0fe7140a7 100644 --- a/packages/core/test/NodeExecuteFunctions.test.ts +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -12,7 +12,6 @@ import type { ITaskDataConnections, IWorkflowExecuteAdditionalData, Workflow, - WorkflowHooks, } from 'n8n-workflow'; import nock from 'nock'; import { tmpdir } from 'os'; @@ -22,6 +21,7 @@ import type { SecureContextOptions } from 'tls'; import Container from 'typedi'; import { BinaryDataService } from '@/BinaryData/BinaryData.service'; +import type { ExecutionHooks } from '@/execution-hooks'; import { InstanceSettings } from '@/InstanceSettings'; import { binaryToString, @@ -401,12 +401,12 @@ describe('NodeExecuteFunctions', () => { describe('proxyRequestToAxios', () => { const baseUrl = 'http://example.de'; const workflow = mock(); - const hooks = mock(); + const hooks = mock(); const additionalData = mock({ hooks }); const node = mock(); beforeEach(() => { - hooks.executeHookFunctions.mockClear(); + hooks.executeHook.mockClear(); }); test('should rethrow an error with `status` property', async () => { @@ -422,10 +422,7 @@ describe('NodeExecuteFunctions', () => { test('should not throw if the response status is 200', async () => { nock(baseUrl).get('/test').reply(200); await proxyRequestToAxios(workflow, additionalData, node, `${baseUrl}/test`); - expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [ - workflow.id, - node, - ]); + expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); test('should throw if the response status is 403', async () => { @@ -445,7 +442,7 @@ describe('NodeExecuteFunctions', () => { expect(error.config).toBeUndefined(); expect(error.message).toEqual('403 - "Forbidden"'); } - expect(hooks.executeHookFunctions).not.toHaveBeenCalled(); + expect(hooks.executeHook).not.toHaveBeenCalled(); }); test('should not throw if the response status is 404, but `simple` option is set to `false`', async () => { @@ -456,10 +453,7 @@ describe('NodeExecuteFunctions', () => { }); expect(response).toEqual('Not Found'); - expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [ - workflow.id, - node, - ]); + expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); test('should return full response when `resolveWithFullResponse` is set to true', async () => { @@ -476,10 +470,7 @@ describe('NodeExecuteFunctions', () => { statusCode: 404, statusMessage: null, }); - expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [ - workflow.id, - node, - ]); + expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); describe('redirects', () => { diff --git a/packages/core/test/TriggersAndPollers.test.ts b/packages/core/test/TriggersAndPollers.test.ts index 27cc8b47d9..040938a9b2 100644 --- a/packages/core/test/TriggersAndPollers.test.ts +++ b/packages/core/test/TriggersAndPollers.test.ts @@ -9,10 +9,10 @@ import type { INodeType, INodeTypes, ITriggerFunctions, - WorkflowHooks, IRun, } from 'n8n-workflow'; +import { ExecutionHooks } from '@/execution-hooks'; import { TriggersAndPollers } from '@/TriggersAndPollers'; describe('TriggersAndPollers', () => { @@ -23,15 +23,8 @@ describe('TriggersAndPollers', () => { }); const nodeTypes = mock(); const workflow = mock({ nodeTypes }); - const hookFunctions = mock({ - sendResponse: [], - workflowExecuteAfter: [], - }); - const additionalData = mock({ - hooks: { - hookFunctions, - }, - }); + const hooks = new ExecutionHooks('internal', '123', mock()); + const additionalData = mock({ hooks }); const triggersAndPollers = new TriggersAndPollers(); beforeEach(() => { @@ -98,8 +91,7 @@ describe('TriggersAndPollers', () => { getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise); - expect(hookFunctions.sendResponse?.length).toBe(1); - await hookFunctions.sendResponse![0]?.({ testResponse: true }); + await hooks.executeHook('sendResponse', [{ testResponse: true }]); expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); }); @@ -111,10 +103,10 @@ describe('TriggersAndPollers', () => { await runTriggerHelper('manual'); getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise, donePromise); - await hookFunctions.sendResponse![0]?.({ testResponse: true }); + await hooks.executeHook('sendResponse', [{ testResponse: true }]); expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); - await hookFunctions.workflowExecuteAfter?.[0]?.(mockRunData, {}); + await hooks.executeHook('workflowExecuteAfter', [mockRunData, {}]); expect(donePromise.resolve).toHaveBeenCalledWith(mockRunData); }); }); diff --git a/packages/core/test/helpers/index.ts b/packages/core/test/helpers/index.ts index 14f19789b0..47eb064c43 100644 --- a/packages/core/test/helpers/index.ts +++ b/packages/core/test/helpers/index.ts @@ -6,7 +6,6 @@ import type { INodeType, INodeTypes, IRun, - ITaskData, IVersionedNodeType, IWorkflowBase, IWorkflowExecuteAdditionalData, @@ -14,10 +13,11 @@ import type { WorkflowTestData, INodeTypeData, } from 'n8n-workflow'; -import { ApplicationError, NodeHelpers, WorkflowHooks } from 'n8n-workflow'; +import { ApplicationError, NodeHelpers } from 'n8n-workflow'; import path from 'path'; import { UnrecognizedNodeTypeError } from '@/errors'; +import { ExecutionHooks } from '@/execution-hooks'; import { predefinedNodesTypes } from './constants'; @@ -53,22 +53,14 @@ export function WorkflowExecuteAdditionalData( waitPromise: IDeferredPromise, nodeExecutionOrder: string[], ): IWorkflowExecuteAdditionalData { - const hookFunctions = { - nodeExecuteAfter: [ - async (nodeName: string, _data: ITaskData): Promise => { - nodeExecutionOrder.push(nodeName); - }, - ], - workflowExecuteAfter: [ - async (fullRunData: IRun): Promise => { - waitPromise.resolve(fullRunData); - }, - ], - }; - - return mock({ - hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()), + const hooks = new ExecutionHooks('trigger', '1', mock()); + hooks.addHook('nodeExecuteAfter', async (nodeName): Promise => { + nodeExecutionOrder.push(nodeName); }); + hooks.addHook('workflowExecuteAfter', async (fullRunData): Promise => { + waitPromise.resolve(fullRunData); + }); + return mock({ hooks }); } const preparePinData = (pinData: IDataObject) => { diff --git a/packages/nodes-base/test/nodes/Helpers.ts b/packages/nodes-base/test/nodes/Helpers.ts index 19e4f72b40..22b418ae5e 100644 --- a/packages/nodes-base/test/nodes/Helpers.ts +++ b/packages/nodes-base/test/nodes/Helpers.ts @@ -7,6 +7,7 @@ import { Credentials, UnrecognizedNodeTypeError, constructExecutionMetaData, + ExecutionHooks, } from 'n8n-core'; import type { CredentialLoadingDetails, @@ -27,14 +28,13 @@ import type { INodeTypeData, INodeTypes, IRun, - ITaskData, IVersionedNodeType, IWorkflowBase, IWorkflowExecuteAdditionalData, NodeLoadingDetails, WorkflowTestData, } from 'n8n-workflow'; -import { ApplicationError, ICredentialsHelper, NodeHelpers, WorkflowHooks } from 'n8n-workflow'; +import { ApplicationError, ICredentialsHelper, NodeHelpers } from 'n8n-workflow'; import nock from 'nock'; import { tmpdir } from 'os'; import path from 'path'; @@ -156,22 +156,17 @@ export function WorkflowExecuteAdditionalData( waitPromise: IDeferredPromise, nodeExecutionOrder: string[], ): IWorkflowExecuteAdditionalData { - const hookFunctions = { - nodeExecuteAfter: [ - async (nodeName: string, _data: ITaskData): Promise => { - nodeExecutionOrder.push(nodeName); - }, - ], - workflowExecuteAfter: [ - async (fullRunData: IRun): Promise => { - waitPromise.resolve(fullRunData); - }, - ], - }; + const hooks = new ExecutionHooks('trigger', '1', mock()); + hooks.addHook('nodeExecuteAfter', async (nodeName) => { + nodeExecutionOrder.push(nodeName); + }); + hooks.addHook('workflowExecuteAfter', async (fullRunData) => { + waitPromise.resolve(fullRunData); + }); return mock({ credentialsHelper: new CredentialsHelper(), - hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()), + hooks, // Get from node.parameters currentNodeParameters: undefined, }); diff --git a/packages/nodes-base/test/nodes/TriggerHelpers.ts b/packages/nodes-base/test/nodes/TriggerHelpers.ts index 7e04dcaa86..bfe8a00eec 100644 --- a/packages/nodes-base/test/nodes/TriggerHelpers.ts +++ b/packages/nodes-base/test/nodes/TriggerHelpers.ts @@ -3,7 +3,8 @@ import { mock } from 'jest-mock-extended'; import get from 'lodash/get'; import merge from 'lodash/merge'; import set from 'lodash/set'; -import { PollContext, returnJsonArray, type InstanceSettings } from 'n8n-core'; +import { PollContext, returnJsonArray } from 'n8n-core'; +import type { InstanceSettings, ExecutionHooks } from 'n8n-core'; import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager'; import type { IBinaryData, @@ -19,7 +20,6 @@ import type { NodeTypeAndVersion, VersionedNodeType, Workflow, - WorkflowHooks, } from 'n8n-workflow'; type MockDeepPartial = Parameters>[0]; @@ -212,7 +212,7 @@ export async function testPollingTriggerNode( return options as IHttpRequestOptions; }, }), - hooks: mock(), + hooks: mock(), }), mode, 'init', diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index a67f7b59b4..fec74cf321 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -24,7 +24,6 @@ import type { ExecutionStatus } from './ExecutionStatus'; import type { Result } from './result'; import type { Workflow } from './Workflow'; import type { EnvProviderState } from './WorkflowDataProxyEnvProvider'; -import type { WorkflowHooks } from './WorkflowHooks'; export interface IAdditionalCredentialOptions { oauth2?: IOAuth2Options; @@ -900,6 +899,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { // TODO: Create later own type only for Config-Nodes export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & BaseExecutionFunctions & { + /** This executes sub-workflows */ executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], @@ -2241,17 +2241,6 @@ export interface IWorkflowCredentials { }; } -export interface IWorkflowExecuteHooks { - [key: string]: Array<(...args: any[]) => Promise> | undefined; - nodeExecuteAfter?: Array< - (nodeName: string, data: ITaskData, executionData: IRunExecutionData) => Promise - >; - nodeExecuteBefore?: Array<(nodeName: string) => Promise>; - workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise>; - workflowExecuteBefore?: Array<(workflow?: Workflow, data?: IRunExecutionData) => Promise>; - sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; -} - export interface IWorkflowExecutionDataProcess { destinationNode?: string; restartExecutionId?: string; @@ -2324,14 +2313,13 @@ type AiEventPayload = { export interface IWorkflowExecuteAdditionalData { credentialsHelper: ICredentialsHelper; - executeWorkflow: ( + executeSubWorkflow: ( workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, options: ExecuteWorkflowOptions, ) => Promise; executionId?: string; restartExecutionId?: string; - hooks?: WorkflowHooks; httpResponse?: express.Response; httpRequest?: express.Request; restApiUrl: string; @@ -2388,11 +2376,6 @@ export type WorkflowActivateMode = | 'manual' // unused | 'leadershipChange'; -export interface IWorkflowHooksOptionalParameters { - retryOf?: string; - pushRef?: string; -} - export namespace WorkflowSettings { export type CallerPolicy = 'any' | 'none' | 'workflowsFromAList' | 'workflowsFromSameOwner'; export type SaveDataExecution = 'DEFAULT' | 'all' | 'none'; diff --git a/packages/workflow/src/WorkflowHooks.ts b/packages/workflow/src/WorkflowHooks.ts deleted file mode 100644 index feb7a46e20..0000000000 --- a/packages/workflow/src/WorkflowHooks.ts +++ /dev/null @@ -1,49 +0,0 @@ -import type { - IWorkflowBase, - IWorkflowExecuteHooks, - IWorkflowHooksOptionalParameters, - WorkflowExecuteMode, -} from './Interfaces'; - -export class WorkflowHooks { - mode: WorkflowExecuteMode; - - workflowData: IWorkflowBase; - - executionId: string; - - pushRef?: string; - - retryOf?: string; - - hookFunctions: IWorkflowExecuteHooks; - - constructor( - hookFunctions: IWorkflowExecuteHooks, - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, - ) { - // eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing - optionalParameters = optionalParameters || {}; - - this.hookFunctions = hookFunctions; - this.mode = mode; - this.executionId = executionId; - this.workflowData = workflowData; - this.pushRef = optionalParameters.pushRef; - // retryOf might be `null` from TypeORM - this.retryOf = optionalParameters.retryOf ?? undefined; - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - async executeHookFunctions(hookName: string, parameters: any[]) { - const hooks = this.hookFunctions[hookName]; - if (hooks !== undefined && Array.isArray(hooks)) { - for (const hookFunction of hooks) { - await hookFunction.apply(this, parameters); - } - } - } -} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 992a27921f..a2e7dc4856 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -17,7 +17,6 @@ export * from './NodeHelpers'; export * from './Workflow'; export * from './WorkflowDataProxy'; export * from './WorkflowDataProxyEnvProvider'; -export * from './WorkflowHooks'; export * from './VersionedNodeType'; export * from './TypeValidation'; export * from './result';