refactor(core): Move WorkflowHooks to core, improve type-safety, and add tests

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-12-23 14:39:41 +01:00
parent f754b22a3f
commit 80180230b1
No known key found for this signature in database
33 changed files with 1890 additions and 1171 deletions

View file

@ -17,13 +17,15 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { VariablesService } from '@/environments.ee/variables/variables.service.ee';
import { EventService } from '@/events/event.service';
import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory';
import { ExternalHooks } from '@/external-hooks';
import { Push } from '@/push';
import { SecretsHelper } from '@/secrets-helpers.ee';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service';
import { Telemetry } from '@/telemetry';
import { PermissionChecker } from '@/user-management/permission-checker';
import { executeWorkflow, getBase, getRunData } from '@/workflow-execute-additional-data';
import { executeSubWorkflow, getBase, getRunData } from '@/workflow-execute-additional-data';
import { mockInstance } from '@test/mocking';
const EXECUTION_ID = '123';
@ -78,6 +80,8 @@ jest.mock('n8n-core', () => ({
}));
describe('WorkflowExecuteAdditionalData', () => {
mockInstance(Push);
mockInstance(ExecutionHooksFactory);
const variablesService = mockInstance(VariablesService);
variablesService.getAllCached.mockResolvedValue([]);
const credentialsHelper = mockInstance(CredentialsHelper);
@ -128,7 +132,7 @@ describe('WorkflowExecuteAdditionalData', () => {
});
it('should execute workflow, return data and execution id', async () => {
const response = await executeWorkflow(
const response = await executeSubWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }),
@ -141,7 +145,7 @@ describe('WorkflowExecuteAdditionalData', () => {
});
it('should execute workflow, skip waiting', async () => {
const response = await executeWorkflow(
const response = await executeSubWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: true }),
@ -154,7 +158,7 @@ describe('WorkflowExecuteAdditionalData', () => {
});
it('should set sub workflow execution as running', async () => {
await executeWorkflow(
await executeSubWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined }),
@ -167,7 +171,7 @@ describe('WorkflowExecuteAdditionalData', () => {
const waitTill = new Date();
runWithData.waitTill = waitTill;
const response = await executeWorkflow(
const response = await executeSubWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }),

View file

@ -11,18 +11,15 @@ import type {
IWorkflowExecutionDataProcess,
StartNodeData,
} from 'n8n-workflow';
import {
Workflow,
WorkflowHooks,
type ExecutionError,
type IWorkflowExecuteHooks,
} from 'n8n-workflow';
import { Workflow, type ExecutionError } from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import Container from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import type { ExecutionEntity } from '@/databases/entities/execution-entity';
import type { User } from '@/databases/entities/user';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { Telemetry } from '@/telemetry';
import { PermissionChecker } from '@/user-management/permission-checker';
@ -36,25 +33,14 @@ import { setupTestServer } from '@test-integration/utils';
let owner: User;
let runner: WorkflowRunner;
let hookFunctions: IWorkflowExecuteHooks;
setupTestServer({ endpointGroups: [] });
mockInstance(Telemetry);
class Watchers {
workflowExecuteAfter = jest.fn();
}
const watchers = new Watchers();
const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter');
beforeAll(async () => {
owner = await createUser({ role: 'global:owner' });
runner = Container.get(WorkflowRunner);
hookFunctions = {
workflowExecuteAfter: [watchers.workflowExecuteAfter],
};
});
afterAll(() => {
@ -67,48 +53,44 @@ beforeEach(async () => {
});
describe('processError', () => {
let workflow: WorkflowEntity;
let execution: ExecutionEntity;
let hooks: core.ExecutionHooks;
const watcher = mock<{ workflowExecuteAfter: () => Promise<void> }>();
beforeEach(async () => {
jest.clearAllMocks();
workflow = await createWorkflow({}, owner);
execution = await createExecution({ status: 'success', finished: true }, workflow);
hooks = new core.ExecutionHooks('webhook', execution.id, workflow);
hooks.addHook('workflowExecuteAfter', watcher.workflowExecuteAfter);
});
test('processError should return early in Bull stalled edge case', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'queue');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
hooks,
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0);
});
test('processError should return early if the error is `ExecutionNotFoundError`', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution({ status: 'success', finished: true }, workflow);
await runner.processError(
new ExecutionNotFoundError(execution.id),
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
hooks,
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(0);
});
test('processError should process error', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
await Container.get(ActiveExecutions).add(
{ executionMode: 'webhook', workflowData: workflow },
execution.id,
@ -119,9 +101,9 @@ describe('processError', () => {
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
hooks,
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
expect(watcher.workflowExecuteAfter).toHaveBeenCalledTimes(1);
});
});

View file

@ -0,0 +1,785 @@
import { mock } from 'jest-mock-extended';
import type { ErrorReporter, ExecutionHooksOptionalParameters } from 'n8n-core';
import type {
IWorkflowBase,
IWorkflowExecutionDataProcess,
ITaskData,
IRunExecutionData,
IRun,
IDataObject,
NodeOperationError,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { EventService } from '@/events/event.service';
import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory';
import type { ExternalHooks } from '@/external-hooks';
import type { IExecutionResponse } from '@/interfaces';
import type { Push } from '@/push';
import type { ExecutionMetadataService } from '@/services/execution-metadata.service';
import type { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import type { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { mockInstance } from '@test/mocking';
describe('ExecutionHooksFactory', () => {
const errorReporter = mock<ErrorReporter>();
const executionRepository = mockInstance(ExecutionRepository);
const externalHooks = mock<ExternalHooks>();
const workflowStatisticsService = mock<WorkflowStatisticsService>();
const workflowStaticDataService = mock<WorkflowStaticDataService>();
const executionMetadataService = mock<ExecutionMetadataService>();
const eventService = mock<EventService>();
const push = mock<Push>();
const hooksFactory = new ExecutionHooksFactory(
mock(),
errorReporter,
executionRepository,
externalHooks,
workflowStatisticsService,
workflowStaticDataService,
executionMetadataService,
eventService,
push,
);
const workflowId = 'workflow_id';
const executionId = '123';
const pushRef = 'abcd';
const workflowData = mock<IWorkflowBase>({
id: workflowId,
settings: {},
nodes: [],
});
const optionalParameters: ExecutionHooksOptionalParameters = {
retryOf: 'retry123',
pushRef,
};
beforeEach(() => {
jest.clearAllMocks();
workflowData.settings = {};
});
describe('forExecutionOnMain', () => {
const executionData = mock<IWorkflowExecutionDataProcess>({
executionMode: 'manual',
workflowData,
pushRef,
retryOf: 'retry123',
});
const newStaticData: IDataObject = { newKey: 'newValue' };
const fullRunData: IRun = {
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'success',
};
it('should add all required hooks', () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
const registeredHooks = (hooks as any).registered;
expect(registeredHooks.nodeExecuteBefore).toHaveLength(1);
expect(registeredHooks.nodeExecuteAfter).toHaveLength(2);
expect(registeredHooks.workflowExecuteBefore).toHaveLength(2);
expect(registeredHooks.workflowExecuteAfter).toHaveLength(2);
expect(registeredHooks.nodeFetchedData).toHaveLength(0);
expect(registeredHooks.sendResponse).toHaveLength(0);
});
it('should create hooks and execute them correctly', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
// Test workflowExecuteBefore hook
await hooks.executeHook('workflowExecuteBefore', []);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']);
// Test nodeExecuteBefore hook
await hooks.executeHook('nodeExecuteBefore', ['testNode']);
expect(push.send).toHaveBeenCalledWith(
{
type: 'nodeExecuteBefore',
data: { executionId, nodeName: 'testNode' },
},
pushRef,
);
push.send.mockClear();
// Test nodeExecuteAfter hook
const taskData = mock<ITaskData>({});
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };
await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);
expect(push.send).toHaveBeenCalledWith(
{
type: 'nodeExecuteAfter',
data: { executionId, nodeName: 'testNode', data: taskData },
},
pushRef,
);
push.send.mockClear();
// Test workflowExecuteAfter hook
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
expect(push.send).toHaveBeenCalledWith(
{
type: 'executionFinished',
data: {
executionId,
workflowId,
status: 'success',
rawData: '[{"resultData":"1"},{"runData":"2"},{}]',
},
},
pushRef,
);
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
workflowData,
fullRunData,
});
});
it('should handle waiting status in workflowExecuteAfter', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
await hooks.executeHook('workflowExecuteAfter', [
{
...fullRunData,
status: 'waiting',
waitTill: new Date('2099-12-31'),
},
newStaticData,
]);
expect(push.send).toHaveBeenCalledWith(
{ type: 'executionWaiting', data: { executionId } },
pushRef,
);
});
describe('static data', () => {
it('should not update for manual executions', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled();
});
it('should update for non-manual executions', async () => {
const hooks = hooksFactory.forExecutionOnMain(
{
...executionData,
executionMode: 'webhook',
},
executionId,
);
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith(
workflowData.id,
newStaticData,
);
});
});
describe('execution saving', () => {
it('should cleanup manual executions, if saving is disabled', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
// Mock workflow settings to not save manual executions
workflowData.settings = { saveManualExecutions: false };
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
expect(executionRepository.softDelete).toHaveBeenCalledWith(executionId);
});
it('should handle execution progress saving', async () => {
workflowData.settings = { saveExecutionProgress: true };
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
const taskData = mock<ITaskData>({});
const runExecutionData: IRunExecutionData = {
resultData: {
runData: {
testNode: [],
},
},
};
const fullExecutionData = mock<IExecutionResponse>({
finished: false,
data: runExecutionData,
});
executionRepository.findSingleExecution
.calledWith(executionId)
.mockResolvedValue(fullExecutionData);
await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);
expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode');
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
executionId,
fullExecutionData,
);
});
});
describe('error handling', () => {
it('should handle error workflows when execution fails', async () => {
// Set workflow settings to save error executions
workflowData.settings = { saveDataErrorExecution: 'all' };
const hooks = hooksFactory.forExecutionOnMain(
{
...executionData,
executionMode: 'webhook',
},
executionId,
);
const executeErrorWorkflowSpy = jest.spyOn(
WorkflowExecuteAdditionalData,
'executeErrorWorkflow',
);
const failedRunData: IRun = {
...fullRunData,
status: 'error',
data: {
resultData: {
runData: {},
error: mock<NodeOperationError>(),
},
},
};
await hooks.executeHook('workflowExecuteAfter', [failedRunData, newStaticData]);
// Verify error workflow execution
expect(executeErrorWorkflowSpy).toHaveBeenCalledWith(
workflowData,
failedRunData,
'webhook',
executionId,
'retry123',
);
executeErrorWorkflowSpy.mockRestore();
});
it('should handle static data save errors', async () => {
const hooks = hooksFactory.forExecutionOnMain(
{
...executionData,
executionMode: 'webhook',
},
executionId,
);
const error = new Error('Static data save failed');
workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error);
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should handle execution save errors', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
const error = new Error('DB save failed');
executionRepository.updateExistingExecution.mockRejectedValueOnce(error);
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
});
describe('metadata handling', () => {
it('should save execution metadata when present', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
const runDataWithMetadata: IRun = {
...fullRunData,
data: {
resultData: {
runData: {},
metadata: {
someMetadata: 'value',
},
},
},
};
await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]);
expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, {
someMetadata: 'value',
});
});
it('should handle metadata save errors gracefully', async () => {
const hooks = hooksFactory.forExecutionOnMain(executionData, executionId);
const error = new Error('Metadata save failed');
executionMetadataService.save.mockRejectedValueOnce(error);
const runDataWithMetadata: IRun = {
...fullRunData,
data: {
resultData: {
runData: {},
metadata: {
someMetadata: 'value',
},
},
},
};
await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
});
});
describe('forExecutionOnWorker', () => {
it('should add all required hooks', () => {
const hooks = hooksFactory.forExecutionOnWorker(
'manual',
executionId,
workflowData,
optionalParameters,
);
const registeredHooks = (hooks as any).registered;
expect(registeredHooks.nodeExecuteBefore).toHaveLength(0);
expect(registeredHooks.nodeExecuteAfter).toHaveLength(1);
expect(registeredHooks.workflowExecuteBefore).toHaveLength(1);
expect(registeredHooks.workflowExecuteAfter).toHaveLength(1);
expect(registeredHooks.nodeFetchedData).toHaveLength(0);
expect(registeredHooks.sendResponse).toHaveLength(0);
});
it('should run external hook `workflow.preExecute` on workflowExecuteBefore', async () => {
const hooks = hooksFactory.forExecutionOnWorker(
'manual',
executionId,
workflowData,
optionalParameters,
);
await hooks.executeHook('workflowExecuteBefore', []);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']);
});
describe('workflowExecuteAfter', () => {
it('should not delete unfinished executions', async () => {
const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData);
const unfinishedRunData: IRun = {
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'running',
finished: false,
};
await hooks.executeHook('workflowExecuteAfter', [unfinishedRunData]);
expect(executionRepository.hardDelete).not.toHaveBeenCalled();
});
it('should handle successful executions based on save settings', async () => {
// Test when success executions should not be saved
workflowData.settings = { saveDataSuccessExecution: 'none' };
const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData);
const successRunData: IRun = {
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [successRunData]);
expect(executionRepository.hardDelete).toHaveBeenCalledWith({
workflowId,
executionId,
});
});
it('should handle error executions based on save settings', async () => {
// Test when error executions should not be saved
workflowData.settings = { saveDataErrorExecution: 'none' };
const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData);
const errorRunData: IRun = {
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'error',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [errorRunData]);
expect(executionRepository.hardDelete).toHaveBeenCalledWith({
workflowId,
executionId,
});
});
it('should not delete executions when save settings allow it', async () => {
// Test when success executions should be saved
workflowData.settings = { saveDataSuccessExecution: 'all' };
const hooks = hooksFactory.forExecutionOnWorker('manual', executionId, workflowData);
const successRunData: IRun = {
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [successRunData]);
expect(executionRepository.hardDelete).not.toHaveBeenCalled();
});
test.each(['manual', 'trigger', 'webhook', 'schedule'] as WorkflowExecuteMode[])(
'should handle execution mode %s',
async (mode) => {
const hooks = hooksFactory.forExecutionOnWorker(mode, executionId, workflowData);
const runData: IRun = {
data: { resultData: { runData: {} } },
mode,
startedAt: new Date(),
status: 'success',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [runData]);
expect(runData.status).toBe('success');
},
);
});
});
describe('forSubExecution', () => {
it('should add all required hooks', () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const registeredHooks = (hooks as any).registered;
expect(registeredHooks.nodeExecuteBefore).toHaveLength(1);
expect(registeredHooks.nodeExecuteAfter).toHaveLength(2);
expect(registeredHooks.workflowExecuteBefore).toHaveLength(2);
expect(registeredHooks.workflowExecuteAfter).toHaveLength(1);
expect(registeredHooks.nodeFetchedData).toHaveLength(1);
expect(registeredHooks.sendResponse).toHaveLength(0);
});
describe('preExecute hooks', () => {
it('should handle workflow pre-execution', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
await hooks.executeHook('workflowExecuteBefore', []);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [
undefined,
'manual',
]);
});
it('should handle node execution progress saving', async () => {
workflowData.settings = { saveExecutionProgress: true };
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const taskData = mock<ITaskData>({});
const runExecutionData: IRunExecutionData = {
resultData: {
runData: {
testNode: [],
},
},
};
const fullExecutionData = mock<IExecutionResponse>({
finished: false,
data: runExecutionData,
});
executionRepository.findSingleExecution
.calledWith(executionId)
.mockResolvedValue(fullExecutionData);
await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);
expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode');
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
executionId,
fullExecutionData,
);
});
});
describe('event hooks', () => {
it('should emit node execution events', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
await hooks.executeHook('nodeExecuteBefore', ['testNode']);
expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', {
executionId,
workflow: workflowData,
nodeName: 'testNode',
});
await hooks.executeHook('nodeExecuteAfter', ['testNode']);
expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', {
executionId,
workflow: workflowData,
nodeName: 'testNode',
});
});
it('should emit workflow execution events', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
await hooks.executeHook('workflowExecuteBefore', []);
expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', {
executionId,
data: workflowData,
});
});
it('should emit node data fetch events', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const testNode = { name: 'Test Node' };
await hooks.executeHook('nodeFetchedData', [workflowId, testNode]);
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', {
workflowId,
node: testNode,
});
});
});
describe('saving hooks', () => {
it('should handle successful execution saves', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const successRunData: IRun = {
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [successRunData, {}]);
expect(executionRepository.updateExistingExecution).toHaveBeenCalled();
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
workflowData,
fullRunData: successRunData,
});
});
it('should handle error execution saves', async () => {
workflowData.settings = { saveDataErrorExecution: 'all' };
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const errorRunData: IRun = {
data: {
resultData: {
runData: {},
error: mock<NodeOperationError>(),
},
},
mode: 'manual',
startedAt: new Date(),
status: 'error',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [errorRunData, {}]);
expect(executionRepository.updateExistingExecution).toHaveBeenCalled();
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
workflowData,
fullRunData: errorRunData,
});
});
it('should handle metadata updates', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const runDataWithMetadata: IRun = {
data: {
resultData: {
runData: {},
metadata: {
parameter: 'test',
},
},
},
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]);
expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, {
parameter: 'test',
});
});
it('should handle static data updates', async () => {
const hooks = hooksFactory.forSubExecution(
'webhook',
executionId,
workflowData,
optionalParameters,
);
const newStaticData = { newKey: 'newValue' };
await hooks.executeHook('workflowExecuteAfter', [
{
data: { resultData: { runData: {} } },
mode: 'webhook',
startedAt: new Date(),
status: 'success',
finished: true,
},
newStaticData,
]);
expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith(
workflowId,
newStaticData,
);
});
});
describe('error handling', () => {
it('should handle execution save errors', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const error = new Error('Save failed');
executionRepository.updateExistingExecution.mockRejectedValueOnce(error);
await hooks.executeHook('workflowExecuteAfter', [
{
data: { resultData: { runData: {} } },
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
},
{},
]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should handle metadata save errors', async () => {
const hooks = hooksFactory.forSubExecution(
'manual',
executionId,
workflowData,
optionalParameters,
);
const error = new Error('Metadata save failed');
executionMetadataService.save.mockRejectedValueOnce(error);
const runDataWithMetadata: IRun = {
data: {
resultData: {
runData: {},
metadata: {},
},
},
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
};
await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
});
});
});

View file

@ -0,0 +1,464 @@
import { stringify } from 'flatted';
import { pick } from 'lodash';
import type { ExecutionHooksOptionalParameters } from 'n8n-core';
import { ErrorReporter, ExecutionHooks, Logger } from 'n8n-core';
import type {
IRun,
ExecutionStatus,
WorkflowExecuteMode,
IWorkflowBase,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { EventService } from '@/events/event.service';
import { ExternalHooks } from '@/external-hooks';
import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces';
import { Push } from '@/push';
import { ExecutionMetadataService } from '@/services/execution-metadata.service';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { isWorkflowIdValid } from '@/utils';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { restoreBinaryDataId } from './restore-binary-data-id';
import { saveExecutionProgress } from './save-execution-progress';
import { determineFinalExecutionStatus } from './shared/shared-hook-functions';
import { toSaveSettings } from './to-save-settings';
@Service()
export class ExecutionHooksFactory {
constructor(
private readonly logger: Logger,
private readonly errorReporter: ErrorReporter,
private readonly executionRepository: ExecutionRepository,
private readonly externalHooks: ExternalHooks,
private readonly workflowStatisticsService: WorkflowStatisticsService,
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly executionMetadataService: ExecutionMetadataService,
private readonly eventService: EventService,
private readonly push: Push,
) {}
/** Returns ExecutionHooks instance for running the main workflow */
forExecutionOnMain(
{ executionMode, workflowData, pushRef, retryOf }: IWorkflowExecutionDataProcess,
executionId: string,
) {
const hooks = new ExecutionHooks(executionMode, executionId, workflowData, {
pushRef,
retryOf,
});
this.addPreExecuteHooks(hooks);
this.addSavingHooks(hooks, false);
this.addPushHooks(hooks);
return hooks;
}
/** Returns ExecutionHooks instance for main process if workflow runs via worker */
forExecutionOnWorker(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters: ExecutionHooksOptionalParameters = {},
) {
const hooks = new ExecutionHooks(mode, executionId, workflowData, optionalParameters);
this.addPreExecuteHooks(hooks);
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
const { executionRepository } = this;
// TODO: >>> clear all nodeExecuteAfter hooks <<<
// hookFunctions.nodeExecuteAfter = [];
hooks.addHook('workflowExecuteAfter', async function (fullRunData) {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave) {
await executionRepository.hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
}
});
return hooks;
}
/** Returns ExecutionHooks instance for running sub-workflows */
forSubExecution(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters: ExecutionHooksOptionalParameters,
) {
const hooks = new ExecutionHooks(mode, executionId, workflowData, optionalParameters);
this.addPreExecuteHooks(hooks);
this.addEventHooks(hooks);
this.addSavingHooks(hooks, false);
return hooks;
}
private addPreExecuteHooks(hooks: ExecutionHooks) {
const { externalHooks } = this;
hooks.addHook('workflowExecuteBefore', async function (workflow) {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
});
// TODO: skip this if saveSettings.progress is not true
hooks.addHook('nodeExecuteAfter', async function (nodeName, data, executionData) {
await saveExecutionProgress(
this.workflowData,
this.executionId,
nodeName,
data,
executionData,
this.pushRef,
);
});
}
private addEventHooks(hooks: ExecutionHooks) {
const { eventService, workflowStatisticsService } = this;
hooks.addHook('nodeExecuteBefore', async function (nodeName) {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
});
hooks.addHook('nodeExecuteAfter', async function (nodeName) {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
});
hooks.addHook('workflowExecuteBefore', async function () {
const { executionId, workflowData } = this;
eventService.emit('workflow-pre-execute', { executionId, data: workflowData });
});
hooks.addHook('nodeFetchedData', async (workflowId, node) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
});
}
/** Returns hook functions to save workflow execution and call error workflow */
private addSavingHooks(hooks: ExecutionHooks, isWorker: boolean) {
const {
errorReporter,
executionRepository,
logger,
workflowStatisticsService,
workflowStaticDataService,
} = this;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const factory = this;
// eslint-disable-next-line complexity
hooks.addHook('workflowExecuteAfter', async function (fullRunData, newStaticData) {
logger.debug('Executing hook (hookFunctionsSave)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
isWorker,
});
// TODO: why is this skipped in the worker?
if (!isWorker) {
await restoreBinaryDataId(fullRunData, this.executionId, this.mode);
}
const isManualMode = this.mode === 'manual';
try {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await workflowStaticDataService.saveStaticDataById(this.workflowData.id, newStaticData);
} catch (e) {
errorReporter.error(e);
logger.error(
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${ensureError(e).message}" (hookFunctionsSave)`,
{ executionId: this.executionId, workflowId: this.workflowData.id },
);
}
}
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
/**
* When manual executions are not being saved, we only soft-delete
* the execution so that the user can access its binary data
* while building their workflow.
*
* The manual execution and its binary data will be hard-deleted
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await executionRepository.softDelete(this.executionId);
return;
}
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave && !fullRunData.waitTill && !isManualMode) {
WorkflowExecuteAdditionalData.executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
await executionRepository.hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
return;
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const fullExecutionData = factory.prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal: executionStatus,
retryOf: this.retryOf,
});
// When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef;
}
await factory.updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
if (!isManualMode) {
WorkflowExecuteAdditionalData.executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} catch (error) {
errorReporter.error(error);
logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
error: ensureError(error),
});
if (!isManualMode) {
WorkflowExecuteAdditionalData.executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} finally {
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
});
}
});
}
/** Returns hook functions to push data to Editor-UI */
private addPushHooks(hooks: ExecutionHooks) {
const { logger, push } = this;
hooks.addHook('nodeExecuteBefore', async function (nodeName) {
const { pushRef, executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
workflowId: this.workflowData.id,
});
push.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef);
});
hooks.addHook('nodeExecuteAfter', async function (nodeName, data) {
const { pushRef, executionId } = this;
// Push data to session which started workflow after each rendered node
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
workflowId: this.workflowData.id,
});
push.send({ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, pushRef);
});
hooks.addHook('workflowExecuteBefore', async function (_workflow, data) {
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
pushRef,
workflowId,
});
// Push data to session which started the workflow
if (pushRef === undefined) {
return;
}
push.send(
{
type: 'executionStarted',
data: {
executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId,
workflowName,
flattedRunData: data?.resultData.runData
? stringify(data.resultData.runData)
: stringify({}),
},
},
pushRef,
);
});
hooks.addHook('workflowExecuteAfter', async function (fullRunData) {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;
const { id: workflowId } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
pushRef,
workflowId,
});
const { status } = fullRunData;
if (status === 'waiting') {
push.send({ type: 'executionWaiting', data: { executionId } }, pushRef);
} else {
const rawData = stringify(fullRunData.data);
push.send(
{ type: 'executionFinished', data: { executionId, workflowId, status, rawData } },
pushRef,
);
}
});
}
private prepareExecutionDataForDbUpdate(parameters: {
runData: IRun;
workflowData: IWorkflowBase;
workflowStatusFinal: ExecutionStatus;
retryOf?: string;
}) {
const { runData, workflowData, workflowStatusFinal, retryOf } = parameters;
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const pristineWorkflowData: IWorkflowBase = pick(workflowData, [
'id',
'name',
'active',
'createdAt',
'updatedAt',
'nodes',
'connections',
'settings',
'staticData',
'pinData',
]);
const fullExecutionData: UpdateExecutionPayload = {
data: runData.data,
mode: runData.mode,
finished: runData.finished ? runData.finished : false,
startedAt: runData.startedAt,
stoppedAt: runData.stoppedAt,
workflowData: pristineWorkflowData,
waitTill: runData.waitTill,
status: workflowStatusFinal,
workflowId: pristineWorkflowData.id,
};
if (retryOf !== undefined) {
fullExecutionData.retryOf = retryOf.toString();
}
const workflowId = workflowData.id;
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}
return fullExecutionData;
}
private async updateExistingExecution(parameters: {
executionId: string;
workflowId: string;
executionData: Partial<IExecutionDb>;
}) {
const { executionId, workflowId, executionData } = parameters;
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
this.logger.debug(`Save execution data to database for execution ID ${executionId}`, {
executionId,
workflowId,
finished: executionData.finished,
stoppedAt: executionData.stoppedAt,
});
await this.executionRepository.updateExistingExecution(executionId, executionData);
try {
const metadata = executionData.data?.resultData.metadata;
if (metadata) {
await this.executionMetadataService.save(executionId, metadata);
}
} catch (e) {
const error = ensureError(e);
this.errorReporter.error(error);
this.logger.error(`Failed to save metadata for execution ID ${executionId}`, { error });
}
if (executionData.finished === true && executionData.retryOf !== undefined) {
await this.executionRepository.updateExistingExecution(executionData.retryOf, {
retrySuccessId: executionId,
});
}
}
}

View file

@ -13,6 +13,7 @@ export async function saveExecutionProgress(
executionData: IRunExecutionData,
pushRef?: string,
) {
// TODO: calculate these only once for lifecycle hooks setup
const saveSettings = toSaveSettings(workflowData.settings);
if (!saveSettings.progress) return;

View file

@ -1,12 +1,4 @@
import pick from 'lodash/pick';
import { Logger } from 'n8n-core';
import { ensureError, type ExecutionStatus, type IRun, type IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces';
import { ExecutionMetadataService } from '@/services/execution-metadata.service';
import { isWorkflowIdValid } from '@/utils';
import type { ExecutionStatus, IRun } from 'n8n-workflow';
export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus {
const workflowHasCrashed = runData.status === 'crashed';
@ -23,85 +15,3 @@ export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus {
if (runData.waitTill) workflowStatusFinal = 'waiting';
return workflowStatusFinal;
}
export function prepareExecutionDataForDbUpdate(parameters: {
runData: IRun;
workflowData: IWorkflowBase;
workflowStatusFinal: ExecutionStatus;
retryOf?: string;
}) {
const { runData, workflowData, workflowStatusFinal, retryOf } = parameters;
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const pristineWorkflowData: IWorkflowBase = pick(workflowData, [
'id',
'name',
'active',
'createdAt',
'updatedAt',
'nodes',
'connections',
'settings',
'staticData',
'pinData',
]);
const fullExecutionData: UpdateExecutionPayload = {
data: runData.data,
mode: runData.mode,
finished: runData.finished ? runData.finished : false,
startedAt: runData.startedAt,
stoppedAt: runData.stoppedAt,
workflowData: pristineWorkflowData,
waitTill: runData.waitTill,
status: workflowStatusFinal,
workflowId: pristineWorkflowData.id,
};
if (retryOf !== undefined) {
fullExecutionData.retryOf = retryOf.toString();
}
const workflowId = workflowData.id;
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}
return fullExecutionData;
}
export async function updateExistingExecution(parameters: {
executionId: string;
workflowId: string;
executionData: Partial<IExecutionDb>;
}) {
const logger = Container.get(Logger);
const { executionId, workflowId, executionData } = parameters;
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
logger.debug(`Save execution data to database for execution ID ${executionId}`, {
executionId,
workflowId,
finished: executionData.finished,
stoppedAt: executionData.stoppedAt,
});
await Container.get(ExecutionRepository).updateExistingExecution(executionId, executionData);
try {
if (executionData.data?.resultData.metadata) {
await Container.get(ExecutionMetadataService).save(
executionId,
executionData.data.resultData.metadata,
);
}
} catch (e) {
const error = ensureError(e);
logger.error(`Failed to save metadata for execution ID ${executionId}`, { error });
}
if (executionData.finished === true && executionData.retryOf !== undefined) {
await Container.get(ExecutionRepository).updateExistingExecution(executionData.retryOf, {
retrySuccessId: executionId,
});
}
}

View file

@ -10,6 +10,7 @@ import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import type { EventMessageTypes as EventMessage } from '@/eventbus/event-message-classes';
import { EventMessageNode } from '@/eventbus/event-message-classes/event-message-node';
import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { Push } from '@/push';
import { mockInstance } from '@test/mocking';
@ -26,10 +27,12 @@ describe('ExecutionRecoveryService', () => {
let executionRecoveryService: ExecutionRecoveryService;
let executionRepository: ExecutionRepository;
let executionHooksFactory: ExecutionHooksFactory;
beforeAll(async () => {
await testDb.init();
executionRepository = Container.get(ExecutionRepository);
executionHooksFactory = Container.get(ExecutionHooksFactory);
executionRecoveryService = new ExecutionRecoveryService(
mock(),
@ -37,6 +40,7 @@ describe('ExecutionRecoveryService', () => {
push,
executionRepository,
mock(),
executionHooksFactory,
);
});

View file

@ -9,9 +9,9 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { EventService } from '@/events/event.service';
import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory';
import type { IExecutionResponse } from '@/interfaces';
import { Push } from '@/push';
import { getWorkflowHooksMain } from '@/workflow-execute-additional-data'; // @TODO: Dependency cycle
import type { EventMessageTypes } from '../eventbus/event-message-classes';
@ -26,6 +26,7 @@ export class ExecutionRecoveryService {
private readonly push: Push,
private readonly executionRepository: ExecutionRepository,
private readonly eventService: EventService,
private readonly executionHooksFactory: ExecutionHooksFactory,
) {}
/**
@ -182,7 +183,7 @@ export class ExecutionRecoveryService {
runData: execution,
});
const externalHooks = getWorkflowHooksMain(
const lifecycleHooks = this.executionHooksFactory.forExecutionOnMain(
{
userId: '',
workflowData: execution.workflowData,
@ -204,6 +205,6 @@ export class ExecutionRecoveryService {
status: execution.status,
};
await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]);
await lifecycleHooks.executeHook('workflowExecuteAfter', [run]);
}
}

View file

@ -19,6 +19,7 @@ describe('JobProcessor', () => {
mock(),
mock(),
mock(),
mock(),
);
const result = await jobProcessor.processJob(mock<Job>());

View file

@ -8,6 +8,7 @@ import { Service } from 'typedi';
import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory';
import { NodeTypes } from '@/node-types';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
@ -34,6 +35,7 @@ export class JobProcessor {
private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes,
private readonly instanceSettings: InstanceSettings,
private readonly executionHooksFactory: ExecutionHooksFactory,
) {
this.logger = this.logger.scoped('scaling');
}
@ -115,25 +117,23 @@ export class JobProcessor {
executionTimeoutTimestamp,
);
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
additionalData.hooks = this.executionHooksFactory.forSubExecution(
execution.mode,
job.data.executionId,
execution.workflowData,
{ retryOf: execution.retryOf as string },
{ retryOf: execution.retryOf },
);
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
const msg: RespondToWebhookMessage = {
kind: 'respond-to-webhook',
executionId,
response: this.encodeWebhookResponse(response),
workerId: this.instanceSettings.hostId,
};
additionalData.hooks.addHook('sendResponse', async (response) => {
const msg: RespondToWebhookMessage = {
kind: 'respond-to-webhook',
executionId,
response: this.encodeWebhookResponse(response),
workerId: this.instanceSettings.hostId,
};
await job.progress(msg);
},
];
await job.progress(msg);
});
additionalData.executionId = executionId;

View file

@ -2,7 +2,7 @@ import { mock } from 'jest-mock-extended';
import type { INode } from 'n8n-workflow';
import { NodeOperationError, type Workflow } from 'n8n-workflow';
import { objectToError } from '../workflow-execute-additional-data';
import { objectToError } from '../object-to-error';
describe('objectToError', () => {
describe('node error handling', () => {

View file

@ -0,0 +1,53 @@
import { isObjectLiteral } from 'n8n-core';
import { NodeOperationError } from 'n8n-workflow';
import type { Workflow } from 'n8n-workflow';
export function objectToError(errorObject: unknown, workflow: Workflow): Error {
// TODO: Expand with other error types
if (errorObject instanceof Error) {
// If it's already an Error instance, return it as is.
return errorObject;
} else if (
isObjectLiteral(errorObject) &&
'message' in errorObject &&
typeof errorObject.message === 'string'
) {
// If it's an object with a 'message' property, create a new Error instance.
let error: Error | undefined;
if (
'node' in errorObject &&
isObjectLiteral(errorObject.node) &&
typeof errorObject.node.name === 'string'
) {
const node = workflow.getNode(errorObject.node.name);
if (node) {
error = new NodeOperationError(
node,
errorObject as unknown as Error,
errorObject as object,
);
}
}
if (error === undefined) {
error = new Error(errorObject.message);
}
if ('description' in errorObject) {
// @ts-expect-error Error descriptions are surfaced by the UI but
// not all backend errors account for this property yet.
error.description = errorObject.description as string;
}
if ('stack' in errorObject) {
// If there's a 'stack' property, set it on the new Error instance.
error.stack = errorObject.stack as string;
}
return error;
} else {
// If it's neither an Error nor an object with a 'message' property, create a generic Error.
return new Error('An error occurred');
}
}

View file

@ -4,9 +4,8 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { PushMessage, PushType } from '@n8n/api-types';
import { GlobalConfig } from '@n8n/config';
import { stringify } from 'flatted';
import { ErrorReporter, Logger, WorkflowExecute, isObjectLiteral } from 'n8n-core';
import { ApplicationError, NodeOperationError, Workflow, WorkflowHooks } from 'n8n-workflow';
import { ErrorReporter, Logger } from 'n8n-core';
import { ApplicationError } from 'n8n-workflow';
import type {
IDataObject,
IExecuteData,
@ -16,15 +15,11 @@ import type {
INodeParameters,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowBase,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
IWorkflowSettings,
WorkflowExecuteMode,
ExecutionStatus,
ExecutionError,
IExecuteFunctions,
ITaskDataConnections,
ExecuteWorkflowOptions,
@ -32,90 +27,27 @@ import type {
EnvProviderState,
ExecuteWorkflowData,
RelatedExecution,
Workflow,
} from 'n8n-workflow';
import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map';
import { ExternalHooks } from '@/external-hooks';
import type { IWorkflowErrorData, UpdateExecutionPayload } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import type { IWorkflowErrorData } from '@/interfaces';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { findSubworkflowStart } from '@/utils';
import * as WorkflowHelpers from '@/workflow-helpers';
import { WorkflowRepository } from './databases/repositories/workflow.repository';
import { EventService } from './events/event.service';
import { restoreBinaryDataId } from './execution-lifecycle-hooks/restore-binary-data-id';
import { saveExecutionProgress } from './execution-lifecycle-hooks/save-execution-progress';
import {
determineFinalExecutionStatus,
prepareExecutionDataForDbUpdate,
updateExistingExecution,
} from './execution-lifecycle-hooks/shared/shared-hook-functions';
import { toSaveSettings } from './execution-lifecycle-hooks/to-save-settings';
import { TaskManager } from './runners/task-managers/task-manager';
import { SecretsHelper } from './secrets-helpers.ee';
import { OwnershipService } from './services/ownership.service';
import { UrlService } from './services/url.service';
import { SubworkflowPolicyChecker } from './subworkflows/subworkflow-policy-checker.service';
import { PermissionChecker } from './user-management/permission-checker';
import { WorkflowRunner } from './workflow-runner';
import { WorkflowExecutionService } from './workflows/workflow-execution.service';
import { WorkflowStaticDataService } from './workflows/workflow-static-data.service';
export function objectToError(errorObject: unknown, workflow: Workflow): Error {
// TODO: Expand with other error types
if (errorObject instanceof Error) {
// If it's already an Error instance, return it as is.
return errorObject;
} else if (
isObjectLiteral(errorObject) &&
'message' in errorObject &&
typeof errorObject.message === 'string'
) {
// If it's an object with a 'message' property, create a new Error instance.
let error: Error | undefined;
if (
'node' in errorObject &&
isObjectLiteral(errorObject.node) &&
typeof errorObject.node.name === 'string'
) {
const node = workflow.getNode(errorObject.node.name);
if (node) {
error = new NodeOperationError(
node,
errorObject as unknown as Error,
errorObject as object,
);
}
}
if (error === undefined) {
error = new Error(errorObject.message);
}
if ('description' in errorObject) {
// @ts-expect-error Error descriptions are surfaced by the UI but
// not all backend errors account for this property yet.
error.description = errorObject.description as string;
}
if ('stack' in errorObject) {
// If there's a 'stack' property, set it on the new Error instance.
error.stack = errorObject.stack as string;
}
return error;
} else {
// If it's neither an Error nor an object with a 'message' property, create a generic Error.
return new Error('An error occurred');
}
}
/**
* Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects
@ -238,434 +170,6 @@ export function executeErrorWorkflow(
}
}
/**
* Returns hook functions to push data to Editor-UI
*
*/
function hookFunctionsPush(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const pushInstance = Container.get(Push);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { pushRef, executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
workflowId: this.workflowData.id,
});
pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
const { pushRef, executionId } = this;
// Push data to session which started workflow after each rendered node
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
workflowId: this.workflowData.id,
});
pushInstance.send(
{ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } },
pushRef,
);
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks, _workflow, data): Promise<void> {
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
pushRef,
workflowId,
});
// Push data to session which started the workflow
if (pushRef === undefined) {
return;
}
pushInstance.send(
{
type: 'executionStarted',
data: {
executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId,
workflowName,
flattedRunData: data?.resultData.runData
? stringify(data.resultData.runData)
: stringify({}),
},
},
pushRef,
);
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;
const { id: workflowId } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
pushRef,
workflowId,
});
const { status } = fullRunData;
if (status === 'waiting') {
pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef);
} else {
const rawData = stringify(fullRunData.data);
pushInstance.send(
{ type: 'executionFinished', data: { executionId, workflowId, status, rawData } },
pushRef,
);
}
},
],
};
}
export function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
const externalHooks = Container.get(ExternalHooks);
return {
workflowExecuteBefore: [
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
nodeExecuteAfter: [
async function (
this: WorkflowHooks,
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
): Promise<void> {
await saveExecutionProgress(
this.workflowData,
this.executionId,
nodeName,
data,
executionData,
this.pushRef,
);
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*
*/
function hookFunctionsSave(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
logger.debug('Executing hook (hookFunctionsSave)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
await restoreBinaryDataId(fullRunData, this.executionId, this.mode);
const isManualMode = this.mode === 'manual';
try {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id,
newStaticData,
);
} catch (e) {
Container.get(ErrorReporter).error(e);
logger.error(
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`,
{ executionId: this.executionId, workflowId: this.workflowData.id },
);
}
}
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
/**
* When manual executions are not being saved, we only soft-delete
* the execution so that the user can access its binary data
* while building their workflow.
*
* The manual execution and its binary data will be hard-deleted
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
return;
}
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave && !fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
return;
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal: executionStatus,
retryOf: this.retryOf,
});
// When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef;
}
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} catch (error) {
Container.get(ErrorReporter).error(error);
logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
error,
});
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} finally {
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
});
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
* for running with queues. Manual executions should never run on queues as
* they are always executed in the main process.
*
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [
async function (): Promise<void> {
const { executionId, workflowData } = this;
eventService.emit('workflow-pre-execute', { executionId, data: workflowData });
},
],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
logger.debug('Executing hook (hookFunctionsSaveWorker)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
try {
if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id,
newStaticData,
);
} catch (e) {
Container.get(ErrorReporter).error(e);
logger.error(
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`,
{ pushRef: this.pushRef, workflowId: this.workflowData.id },
);
}
}
const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
fullRunData.status = workflowStatusFinal;
if (workflowStatusFinal !== 'success' && workflowStatusFinal !== 'waiting') {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
// As a result, we should create an IWorkflowBase object with only the data we want to save in it.
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal,
retryOf: this.retryOf,
});
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
} catch (error) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
} finally {
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
});
}
},
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('workflow-post-execute', {
workflow,
executionId,
runData,
});
},
async function (this: WorkflowHooks, fullRunData: IRun) {
const externalHooks = Container.get(ExternalHooks);
if (externalHooks.exists('workflow.postExecute')) {
try {
await externalHooks.run('workflow.postExecute', [
fullRunData,
this.workflowData,
this.executionId,
]);
} catch (error) {
Container.get(ErrorReporter).error(error);
Container.get(Logger).error(
'There was a problem running hook "workflow.postExecute"',
error,
);
}
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
export async function getRunData(
workflowData: IWorkflowBase,
inputData?: INodeExecutionData[],
@ -756,9 +260,9 @@ export async function getWorkflowData(
}
/**
* Executes the workflow with the given ID
* Executes a sub-workflow with the given ID
*/
export async function executeWorkflow(
export async function executeSubWorkflow(
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
@ -775,7 +279,7 @@ export async function executeWorkflow(
const executionId = await activeExecutions.add(runData);
const executionPromise = startExecution(
const executionPromise = Container.get(WorkflowRunner).runSubWorkflow(
additionalData,
options,
executionId,
@ -790,174 +294,6 @@ export async function executeWorkflow(
return await executionPromise;
}
async function startExecution(
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
executionId: string,
runData: IWorkflowExecutionDataProcess,
workflowData: IWorkflowBase,
): Promise<ExecuteWorkflowData> {
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions);
const eventService = Container.get(EventService);
const executionRepository = Container.get(ExecutionRepository);
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({
id: workflowData.id,
name: workflowName,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
/**
* A subworkflow execution in queue mode is not enqueued, but rather runs in the
* same worker process as the parent execution. Hence ensure the subworkflow
* execution is marked as started as well.
*/
await executionRepository.setRunning(executionId);
Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData });
let data;
try {
await Container.get(PermissionChecker).check(workflowData.id, workflowData.nodes);
await Container.get(SubworkflowPolicyChecker).check(
workflow,
options.parentWorkflowId,
options.node,
additionalData.userId,
);
// Create new additionalData to have different workflow loaded and to call
// different webhooks
const additionalDataIntegrated = await getBase();
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(
runData.executionMode,
executionId,
workflowData,
);
additionalDataIntegrated.executionId = executionId;
additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager;
// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
// and get executionID from `activeExecutions` running on main process
additionalDataIntegrated.executeWorkflow = additionalData.executeWorkflow;
let subworkflowTimeout = additionalData.executionTimeoutTimestamp;
const workflowSettings = workflowData.settings;
if (workflowSettings?.executionTimeout !== undefined && workflowSettings.executionTimeout > 0) {
// We might have received a max timeout timestamp from the parent workflow
// If we did, then we get the minimum time between the two timeouts
// If no timeout was given from the parent, then we use our timeout.
subworkflowTimeout = Math.min(
additionalData.executionTimeoutTimestamp || Number.MAX_SAFE_INTEGER,
Date.now() + workflowSettings.executionTimeout * 1000,
);
}
additionalDataIntegrated.executionTimeoutTimestamp = subworkflowTimeout;
const runExecutionData = runData.executionData as IRunExecutionData;
// Execute the workflow
const workflowExecute = new WorkflowExecute(
additionalDataIntegrated,
runData.executionMode,
runExecutionData,
);
const execution = workflowExecute.processRunExecutionData(workflow);
activeExecutions.attachWorkflowExecution(executionId, execution);
data = await execution;
} catch (error) {
const executionError = error ? (error as ExecutionError) : undefined;
const fullRunData: IRun = {
data: {
resultData: {
error: executionError,
runData: {},
},
},
finished: false,
mode: 'integrated',
startedAt: new Date(),
stoppedAt: new Date(),
status: 'error',
};
// When failing, we might not have finished the execution
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this
const fullExecutionData: UpdateExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
status: fullRunData.status,
workflowData,
workflowId: workflowData.id,
};
if (workflowData.id) {
fullExecutionData.workflowId = workflowData.id;
}
activeExecutions.finalizeExecution(executionId, fullRunData);
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError(
{
...executionError,
stack: executionError?.stack,
message: executionError?.message,
},
workflow,
);
}
await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);
eventService.emit('workflow-post-execute', {
workflow: workflowData,
executionId,
userId: additionalData.userId,
runData: data,
});
// subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here
if (data.finished === true || data.status === 'waiting') {
// Workflow did finish successfully
activeExecutions.finalizeExecution(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return {
executionId,
data: returnData!.data!.main,
waitTill: data.waitTill,
};
}
activeExecutions.finalizeExecution(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;
throw objectToError(
{
...error,
stack: error?.stack,
},
workflow,
);
}
export function setExecutionStatus(status: ExecutionStatus) {
const logger = Container.get(Logger);
if (this.executionId === undefined) {
@ -1002,7 +338,7 @@ export async function getBase(
return {
credentialsHelper: Container.get(CredentialsHelper),
executeWorkflow,
executeSubWorkflow,
restApiUrl: urlBaseWebhook + globalConfig.endpoints.rest,
instanceBaseUrl: urlBaseWebhook,
formWaitingBaseUrl: urlBaseWebhook + globalConfig.endpoints.formWaiting,
@ -1056,127 +392,3 @@ export async function getBase(
eventService.emit(eventName, payload),
};
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
function getWorkflowHooksIntegrated(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSaveWorker();
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPreExecute();
// TODO: why are workers pushing to frontend?
// TODO: simplifying this for now to just leave the bare minimum hooks
// const hookFunctions = hookFunctionsPush();
// const preExecuteFunctions = hookFunctionsPreExecute();
// for (const key of Object.keys(preExecuteFunctions)) {
// if (hookFunctions[key] === undefined) {
// hookFunctions[key] = [];
// }
// hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
// }
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
hookFunctions.workflowExecuteAfter = [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
}
},
];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running the main workflow
*
*/
export function getWorkflowHooksMain(
data: IWorkflowExecutionDataProcess,
executionId: string,
): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], pushFunctions[key]);
}
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = [];
if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
pushRef: data.pushRef,
retryOf: data.retryOf as string,
});
}

View file

@ -2,6 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { ExecutionHooks } from 'n8n-core';
import { ErrorReporter, InstanceSettings, Logger, WorkflowExecute } from 'n8n-core';
import type {
ExecutionError,
@ -10,8 +11,11 @@ import type {
IPinData,
IRun,
WorkflowExecuteMode,
WorkflowHooks,
IWorkflowExecutionDataProcess,
IWorkflowExecuteAdditionalData,
ExecuteWorkflowOptions,
IWorkflowBase,
ExecuteWorkflowData,
} from 'n8n-workflow';
import { ExecutionCancelledError, Workflow } from 'n8n-workflow';
import PCancelable from 'p-cancelable';
@ -20,19 +24,22 @@ import { Container, Service } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { EventService } from '@/events/event.service';
import { ExecutionHooksFactory } from '@/execution-lifecycle-hooks/execution-hooks-factory';
import { ExternalHooks } from '@/external-hooks';
import type { UpdateExecutionPayload } from '@/interfaces';
import { ManualExecutionService } from '@/manual-execution.service';
import { NodeTypes } from '@/node-types';
import type { ScalingService } from '@/scaling/scaling.service';
import type { Job, JobData } from '@/scaling/scaling.types';
import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service';
import { PermissionChecker } from '@/user-management/permission-checker';
import { objectToError } from '@/utils/object-to-error';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { generateFailedExecutionFromError } from '@/workflow-helpers';
import { generateFailedExecutionFromError, getDataLastExecutedNodeData } from '@/workflow-helpers';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { ExecutionNotFoundError } from './errors/execution-not-found-error';
import { EventService } from './events/event.service';
import { ManualExecutionService } from './manual-execution.service';
@Service()
export class WorkflowRunner {
private scalingService: ScalingService;
@ -48,6 +55,7 @@ export class WorkflowRunner {
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker,
private readonly subworkflowPolicyChecker: SubworkflowPolicyChecker,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
private readonly manualExecutionService: ManualExecutionService,
@ -59,7 +67,7 @@ export class WorkflowRunner {
startedAt: Date,
executionMode: WorkflowExecuteMode,
executionId: string,
hooks?: WorkflowHooks,
hooks?: ExecutionHooks,
) {
// This means the execution was probably cancelled and has already
// been cleaned up.
@ -109,9 +117,7 @@ export class WorkflowRunner {
// set the execution to failed.
this.activeExecutions.finalizeExecution(executionId, fullRunData);
if (hooks) {
await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]);
}
await hooks?.executeHook('workflowExecuteAfter', [fullRunData]);
}
/** Run the workflow
@ -131,14 +137,12 @@ export class WorkflowRunner {
try {
await this.permissionChecker.check(workflowId, nodes);
} catch (error) {
const executionHooksFactory = Container.get(ExecutionHooksFactory);
// Create a failed execution with the data for the node, save it and abort execution
const runData = generateFailedExecutionFromError(data.executionMode, error, error.node);
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
await workflowHooks.executeHookFunctions('workflowExecuteBefore', [
undefined,
data.executionData,
]);
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
const hooks = executionHooksFactory.forExecutionOnMain(data, executionId);
await hooks.executeHook('workflowExecuteBefore', [undefined, data.executionData]);
await hooks.executeHook('workflowExecuteAfter', [runData]);
responsePromise?.reject(error);
this.activeExecutions.finalizeExecution(executionId);
return executionId;
@ -258,13 +262,12 @@ export class WorkflowRunner {
await this.executionRepository.setRunning(executionId); // write
try {
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
const executionHooksFactory = Container.get(ExecutionHooksFactory);
additionalData.hooks = executionHooksFactory.forExecutionOnMain(data, executionId);
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
this.activeExecutions.resolveResponsePromise(executionId, response);
},
];
additionalData.hooks.addHook('sendResponse', async (response) => {
this.activeExecutions.resolveResponsePromise(executionId, response);
});
additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({
executionId,
@ -351,14 +354,15 @@ export class WorkflowRunner {
this.scalingService = Container.get(ScalingService);
}
const executionHooksFactory = Container.get(ExecutionHooksFactory);
// TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.
// Check if they get retried by default and how often.
let job: Job;
let hooks: WorkflowHooks;
let lifecycleHooks: ExecutionHooks;
try {
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(
lifecycleHooks = executionHooksFactory.forExecutionOnWorker(
data.executionMode,
executionId,
data.workflowData,
@ -367,11 +371,11 @@ export class WorkflowRunner {
// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await hooks.executeHookFunctions('workflowExecuteBefore', [undefined, data.executionData]);
await lifecycleHooks.executeHook('workflowExecuteBefore', [undefined, data.executionData]);
} catch (error) {
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
const hooks = executionHooksFactory.forExecutionOnWorker(
data.executionMode,
executionId,
data.workflowData,
@ -387,9 +391,9 @@ export class WorkflowRunner {
onCancel(async () => {
await this.scalingService.stopJob(job);
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.
const hooksWorker = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
const hooksWorker = executionHooksFactory.forExecutionOnWorker(
data.executionMode,
executionId,
data.workflowData,
@ -405,9 +409,9 @@ export class WorkflowRunner {
try {
await job.finished();
} catch (error) {
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
const hooks = executionHooksFactory.forExecutionOnWorker(
data.executionMode,
executionId,
data.workflowData,
@ -440,7 +444,7 @@ export class WorkflowRunner {
// Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await hooks.executeHookFunctions('workflowExecuteAfter', [runData]);
await lifecycleHooks.executeHook('workflowExecuteAfter', [runData]);
resolve(runData);
},
@ -454,4 +458,172 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
}
async runSubWorkflow(
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
executionId: string,
runData: IWorkflowExecutionDataProcess,
workflowData: IWorkflowBase,
): Promise<ExecuteWorkflowData> {
const { activeExecutions, externalHooks, eventService, executionRepository, nodeTypes } = this;
await externalHooks.init();
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({
id: workflowData.id,
name: workflowName,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
/**
* A subworkflow execution in queue mode is not enqueued, but rather runs in the
* same worker process as the parent execution. Hence ensure the subworkflow
* execution is marked as started as well.
*/
await executionRepository.setRunning(executionId);
eventService.emit('workflow-pre-execute', { executionId, data: runData });
let data;
try {
await this.permissionChecker.check(workflowData.id, workflowData.nodes);
await this.subworkflowPolicyChecker.check(
workflow,
options.parentWorkflowId,
options.node,
additionalData.userId,
);
const executionHooksFactory = Container.get(ExecutionHooksFactory);
// Create new additionalData to have different workflow loaded and to call
// different webhooks
const additionalDataIntegrated = await WorkflowExecuteAdditionalData.getBase();
additionalDataIntegrated.hooks = executionHooksFactory.forSubExecution(
runData.executionMode,
executionId,
workflowData,
{ pushRef: runData.pushRef, retryOf: runData.retryOf },
);
additionalDataIntegrated.executionId = executionId;
additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager;
// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
// and get executionID from `activeExecutions` running on main process
additionalDataIntegrated.executeSubWorkflow = additionalData.executeSubWorkflow;
let subworkflowTimeout = additionalData.executionTimeoutTimestamp;
const workflowSettings = workflowData.settings;
if (
workflowSettings?.executionTimeout !== undefined &&
workflowSettings.executionTimeout > 0
) {
// We might have received a max timeout timestamp from the parent workflow
// If we did, then we get the minimum time between the two timeouts
// If no timeout was given from the parent, then we use our timeout.
subworkflowTimeout = Math.min(
additionalData.executionTimeoutTimestamp || Number.MAX_SAFE_INTEGER,
Date.now() + workflowSettings.executionTimeout * 1000,
);
}
additionalDataIntegrated.executionTimeoutTimestamp = subworkflowTimeout;
// Execute the workflow
const workflowExecute = new WorkflowExecute(
additionalDataIntegrated,
runData.executionMode,
runData.executionData,
);
const execution = workflowExecute.processRunExecutionData(workflow);
activeExecutions.attachWorkflowExecution(executionId, execution);
data = await execution;
} catch (error) {
const executionError = error ? (error as ExecutionError) : undefined;
const fullRunData: IRun = {
data: {
resultData: {
error: executionError,
runData: {},
},
},
finished: false,
mode: 'integrated',
startedAt: new Date(),
stoppedAt: new Date(),
status: 'error',
};
// When failing, we might not have finished the execution
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this
const fullExecutionData: UpdateExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
status: fullRunData.status,
workflowData,
workflowId: workflowData.id,
};
if (workflowData.id) {
fullExecutionData.workflowId = workflowData.id;
}
activeExecutions.finalizeExecution(executionId, fullRunData);
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError(
{
...executionError,
stack: executionError?.stack,
message: executionError?.message,
},
workflow,
);
}
await this.externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);
eventService.emit('workflow-post-execute', {
workflow: workflowData,
executionId,
userId: additionalData.userId,
runData: data,
});
// subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here
if (data.finished === true || data.status === 'waiting') {
// Workflow did finish successfully
activeExecutions.finalizeExecution(executionId, data);
const returnData = getDataLastExecutedNodeData(data);
return {
executionId,
data: returnData!.data!.main,
waitTill: data.waitTill,
};
}
activeExecutions.finalizeExecution(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;
throw objectToError(
{
...error,
stack: error?.stack,
},
workflow,
);
}
}

View file

@ -4,6 +4,8 @@ import type {
ValidationResult,
} from 'n8n-workflow';
import type { ExecutionHooks } from '@/execution-hooks';
export type Class<T = object, A extends unknown[] = unknown[]> = new (...args: A) => T;
export interface IResponseError extends Error {
@ -35,4 +37,10 @@ export namespace n8n {
}
}
declare module 'n8n-workflow' {
interface IWorkflowExecuteAdditionalData {
hooks?: ExecutionHooks;
}
}
export type ExtendedValidationResult = ValidationResult & { fieldName?: string };

View file

@ -773,7 +773,7 @@ export async function proxyRequestToAxios(
} else if (body === '') {
body = axiosConfig.responseType === 'arraybuffer' ? Buffer.alloc(0) : undefined;
}
await additionalData?.hooks?.executeHookFunctions('nodeFetchedData', [workflow?.id, node]);
await additionalData?.hooks?.executeHook('nodeFetchedData', [workflow?.id, node]);
return configObject.resolveWithFullResponse
? {
body,

View file

@ -46,6 +46,7 @@ export class TriggersAndPollers {
// Add the manual trigger response which resolves when the first time data got emitted
triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => {
const hooks = additionalData.hooks!;
triggerFunctions.emit = (
(resolveEmit) =>
(
@ -53,19 +54,12 @@ export class TriggersAndPollers {
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun>,
) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) {
responsePromise.resolve(response);
}
},
];
if (responsePromise) {
hooks.addHook('sendResponse', async (response) => responsePromise.resolve(response));
}
if (donePromise) {
additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift(
async (runData: IRun): Promise<void> => {
return donePromise.resolve(runData);
},
hooks.addHook('workflowExecuteAfter', async (runData) =>
donePromise.resolve(runData),
);
}
@ -75,13 +69,9 @@ export class TriggersAndPollers {
triggerFunctions.emitError = (
(rejectEmit) =>
(error: Error, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (): Promise<void> => {
if (responsePromise) {
responsePromise.reject(error);
}
},
];
if (responsePromise) {
hooks.addHook('sendResponse', async () => responsePromise.reject(error));
}
rejectEmit(error);
}

View file

@ -404,19 +404,6 @@ export class WorkflowExecute {
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
}
/**
* Executes the hook with the given name
*
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async executeHook(hookName: string, parameters: any[]): Promise<void> {
if (this.additionalData.hooks === undefined) {
return;
}
return await this.additionalData.hooks.executeHookFunctions(hookName, parameters);
}
moveNodeMetadata(): void {
const metadata = get(this.runExecutionData, 'executionData.metadata');
@ -1234,14 +1221,17 @@ export class WorkflowExecute {
this.status = 'canceled';
this.abortController.abort();
const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
void this.additionalData.hooks?.executeHook('workflowExecuteAfter', [fullRunData]);
});
// eslint-disable-next-line complexity
const returnPromise = (async () => {
try {
if (!this.additionalData.restartExecutionId) {
await this.executeHook('workflowExecuteBefore', [workflow, this.runExecutionData]);
await this.additionalData.hooks?.executeHook('workflowExecuteBefore', [
workflow,
this.runExecutionData,
]);
}
} catch (error) {
const e = error as unknown as ExecutionBaseError;
@ -1325,7 +1315,7 @@ export class WorkflowExecute {
node: executionNode.name,
workflowId: workflow.id,
});
await this.executeHook('nodeExecuteBefore', [executionNode.name]);
await this.additionalData.hooks?.executeHook('nodeExecuteBefore', [executionNode.name]);
// Get the index of the current run
runIndex = 0;
@ -1774,7 +1764,7 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
// Only execute the nodeExecuteAfter hook if the node did not get aborted
if (!this.isCancelled) {
await this.executeHook('nodeExecuteAfter', [
await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@ -1816,7 +1806,7 @@ export class WorkflowExecute {
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (this.runExecutionData.waitTill) {
await this.executeHook('nodeExecuteAfter', [
await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@ -1835,7 +1825,7 @@ export class WorkflowExecute {
) {
// Before stopping, make sure we are executing hooks so
// That frontend is notified for example for manual executions.
await this.executeHook('nodeExecuteAfter', [
await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@ -1945,7 +1935,7 @@ export class WorkflowExecute {
// Execute hooks now to make sure that all hooks are executed properly
// Await is needed to make sure that we don't fall into concurrency problems
// When saving node execution data
await this.executeHook('nodeExecuteAfter', [
await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@ -2148,12 +2138,14 @@ export class WorkflowExecute {
this.moveNodeMetadata();
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(
// eslint-disable-next-line @typescript-eslint/no-shadow
(error) => {
console.error('There was a problem running hook "workflowExecuteAfter"', error);
},
);
await this.additionalData.hooks
?.executeHook('workflowExecuteAfter', [fullRunData, newStaticData])
.catch(
// eslint-disable-next-line @typescript-eslint/no-shadow
(error) => {
console.error('There was a problem running hook "workflowExecuteAfter"', error);
},
);
if (closeFunction) {
try {
@ -2220,7 +2212,10 @@ export class WorkflowExecute {
this.moveNodeMetadata();
// Prevent from running the hook if the error is an abort error as it was already handled
if (!this.isCancelled) {
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await this.additionalData.hooks?.executeHook('workflowExecuteAfter', [
fullRunData,
newStaticData,
]);
}
if (closeFunction) {

View file

@ -0,0 +1,114 @@
import { mock } from 'jest-mock-extended';
import type {
IDataObject,
IExecuteResponsePromiseData,
INode,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowBase,
Workflow,
} from 'n8n-workflow';
import type { ExecutionHookName, RegisteredHooks } from '@/execution-hooks';
import { ExecutionHooks } from '@/execution-hooks';
describe('ExecutionHooks', () => {
const executionId = '123';
const pushRef = 'test-ref';
const retryOf = 'test-retry';
const workflowData = mock<IWorkflowBase>();
let hooks: ExecutionHooks;
beforeEach(() => {
jest.clearAllMocks();
hooks = new ExecutionHooks('internal', executionId, workflowData, {
pushRef,
retryOf,
});
});
describe('constructor()', () => {
it('should initialize with correct properties', () => {
expect(hooks.mode).toBe('internal');
expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toBe(workflowData);
expect(hooks.pushRef).toBe(pushRef);
expect(hooks.retryOf).toBe(retryOf);
// @ts-expect-error private property
expect(hooks.registered).toEqual({
nodeExecuteAfter: [],
nodeExecuteBefore: [],
nodeFetchedData: [],
sendResponse: [],
workflowExecuteAfter: [],
workflowExecuteBefore: [],
});
});
});
describe('addHook()', () => {
const hooksHandler =
mock<{
[K in keyof RegisteredHooks]: RegisteredHooks[K][number];
}>();
const testCases: Array<{ hook: ExecutionHookName; args: unknown[] }> = [
{ hook: 'nodeExecuteBefore', args: ['testNode'] },
{
hook: 'nodeExecuteAfter',
args: ['testNode', mock<ITaskData>(), mock<IRunExecutionData>()],
},
{ hook: 'workflowExecuteBefore', args: [mock<Workflow>(), mock<IRunExecutionData>()] },
{ hook: 'workflowExecuteAfter', args: [mock<IRun>(), mock<IDataObject>()] },
{ hook: 'sendResponse', args: [mock<IExecuteResponsePromiseData>()] },
{ hook: 'nodeFetchedData', args: ['workflow123', mock<INode>()] },
];
test.each(testCases)('should add and process $hook hooks', async ({ hook, args }) => {
hooks.addHook(hook, hooksHandler[hook]);
await hooks.executeHook(hook, args);
expect(hooksHandler[hook]).toHaveBeenCalledWith(...args);
});
});
describe('executeHook()', () => {
it('should execute multiple hooks in order', async () => {
const executionOrder: string[] = [];
const hook1 = jest.fn().mockImplementation(async () => {
executionOrder.push('hook1');
});
const hook2 = jest.fn().mockImplementation(async () => {
executionOrder.push('hook2');
});
hooks.addHook('nodeExecuteBefore', hook1, hook2);
await hooks.executeHook('nodeExecuteBefore', ['testNode']);
expect(executionOrder).toEqual(['hook1', 'hook2']);
expect(hook1).toHaveBeenCalled();
expect(hook2).toHaveBeenCalled();
});
it('should maintain correct "this" context', async () => {
const hook = jest.fn().mockImplementation(async function (this: ExecutionHooks) {
expect(this.executionId).toBe(executionId);
expect(this.mode).toBe('internal');
});
hooks.addHook('nodeExecuteBefore', hook);
await hooks.executeHook('nodeExecuteBefore', ['testNode']);
expect(hook).toHaveBeenCalled();
});
it('should handle errors in hooks', async () => {
const errorHook = jest.fn().mockRejectedValue(new Error('Hook failed'));
hooks.addHook('nodeExecuteBefore', errorHook);
await expect(hooks.executeHook('nodeExecuteBefore', ['testNode'])).rejects.toThrow(
'Hook failed',
);
});
});
});

View file

@ -0,0 +1,119 @@
import type {
IDataObject,
IExecuteResponsePromiseData,
INode,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowBase,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
/** This defines the types of hooks that can be executed at different stages of a workflow execution. */
export interface RegisteredHooks {
/** Executed before a node starts executing */
nodeExecuteBefore: Array<(this: ExecutionHooks, nodeName: string) => Promise<void>>;
/** Executed after a node finishes executing */
nodeExecuteAfter: Array<
(
this: ExecutionHooks,
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
) => Promise<void>
>;
/** Executed before workflow execution starts */
workflowExecuteBefore: Array<
(this: ExecutionHooks, workflow?: Workflow, data?: IRunExecutionData) => Promise<void>
>;
/** Executed after workflow execution completes */
workflowExecuteAfter: Array<
(this: ExecutionHooks, data: IRun, newStaticData: IDataObject) => Promise<void>
>;
/** Used by trigger and webhook nodes to respond back to the request */
sendResponse: Array<
(this: ExecutionHooks, response: IExecuteResponsePromiseData) => Promise<void>
>;
/** Executed when a node fetches data */
nodeFetchedData: Array<(this: ExecutionHooks, workflowId: string, node: INode) => Promise<void>>;
}
export type ExecutionHookName = keyof RegisteredHooks;
export interface ExecutionHooksOptionalParameters {
retryOf?: string;
pushRef?: string;
}
/**
* This class serves as a container for execution lifecycle hooks that get triggered during different stages of an execution.
* It manages and executes callback functions registered for specific execution events.
*
* Common use cases include:
* - Saving execution progress to database
* - Pushing execution status updates to the frontend UI
* - Recording workflow statistics
* - Running external hooks for execution events
* - Error and Cancellation handling and cleanup
*
* Example usage:
* ```typescript
* const hooks = new ExecutionHooks(mode, executionId, workflowData);
* hooks.add('workflowExecuteAfter, async function(fullRunData) {
* await saveToDatabase(this.executionId, fullRunData);
*});
* ```
*/
export class ExecutionHooks {
pushRef?: string;
retryOf?: string;
private readonly registered: RegisteredHooks = {
nodeExecuteAfter: [],
nodeExecuteBefore: [],
nodeFetchedData: [],
sendResponse: [],
workflowExecuteAfter: [],
workflowExecuteBefore: [],
};
constructor(
readonly mode: WorkflowExecuteMode,
readonly executionId: string,
readonly workflowData: IWorkflowBase,
optionalParameters: ExecutionHooksOptionalParameters = {},
) {
this.pushRef = optionalParameters.pushRef;
// retryOf might be `null` from TypeORM
this.retryOf = optionalParameters.retryOf ?? undefined;
}
async executeHook<
Hook extends keyof RegisteredHooks,
Params extends unknown[] = Parameters<Exclude<RegisteredHooks[Hook], undefined>[number]>,
>(hookName: Hook, parameters: Params) {
const hooks = this.registered[hookName];
for (const hookFunction of hooks) {
const typedHookFunction = hookFunction as unknown as (
this: ExecutionHooks,
...args: Params
) => Promise<void>;
await typedHookFunction.apply(this, parameters);
}
}
addHook<Hook extends keyof RegisteredHooks>(
hookName: Hook,
...hookFunctions: Array<RegisteredHooks[Hook][number]>
): void {
// @ts-expect-error FIX THIS
this.registered[hookName].push(...hookFunctions);
}
}

View file

@ -12,6 +12,7 @@ export * from './DirectoryLoader';
export * from './Interfaces';
export { InstanceSettings, InstanceType } from './InstanceSettings';
export { Logger } from './logging/logger';
export { ExecutionHooks, ExecutionHooksOptionalParameters } from './execution-hooks';
export * from './NodeExecuteFunctions';
export * from './RoutingNode';
export * from './WorkflowExecute';

View file

@ -210,7 +210,7 @@ export const describeCommonTests = (
};
it('should execute workflow and return data', async () => {
additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData);
additionalData.executeSubWorkflow.mockResolvedValue(executeWorkflowData);
binaryDataService.duplicateBinaryData.mockResolvedValue(data);
const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
@ -227,7 +227,7 @@ export const describeCommonTests = (
it('should put execution to wait if waitTill is returned', async () => {
const waitTill = new Date();
additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill });
additionalData.executeSubWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill });
binaryDataService.duplicateBinaryData.mockResolvedValue(data);
const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {

View file

@ -117,7 +117,7 @@ export class BaseExecuteContext extends NodeExecutionContext {
parentExecution?: RelatedExecution;
},
): Promise<ExecuteWorkflowData> {
const result = await this.additionalData.executeWorkflow(workflowInfo, this.additionalData, {
const result = await this.additionalData.executeSubWorkflow(workflowInfo, this.additionalData, {
...options,
parentWorkflowId: this.workflow.id,
inputData,

View file

@ -192,7 +192,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
}
async sendResponse(response: IExecuteResponsePromiseData): Promise<void> {
await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]);
await this.additionalData.hooks?.executeHook('sendResponse', [response]);
}
/** @deprecated use ISupplyDataFunctions.addInputData */

View file

@ -256,12 +256,12 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
}
runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData;
await additionalData.hooks?.executeHookFunctions('nodeExecuteBefore', [nodeName]);
await additionalData.hooks?.executeHook('nodeExecuteBefore', [nodeName]);
} else {
// Outputs
taskData.executionTime = new Date().getTime() - taskData.startTime;
await additionalData.hooks?.executeHookFunctions('nodeExecuteAfter', [
await additionalData.hooks?.executeHook('nodeExecuteAfter', [
nodeName,
taskData,
this.runExecutionData,

View file

@ -12,7 +12,6 @@ import type {
ITaskDataConnections,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowHooks,
} from 'n8n-workflow';
import nock from 'nock';
import { tmpdir } from 'os';
@ -22,6 +21,7 @@ import type { SecureContextOptions } from 'tls';
import Container from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import type { ExecutionHooks } from '@/execution-hooks';
import { InstanceSettings } from '@/InstanceSettings';
import {
binaryToString,
@ -401,12 +401,12 @@ describe('NodeExecuteFunctions', () => {
describe('proxyRequestToAxios', () => {
const baseUrl = 'http://example.de';
const workflow = mock<Workflow>();
const hooks = mock<WorkflowHooks>();
const hooks = mock<ExecutionHooks>();
const additionalData = mock<IWorkflowExecuteAdditionalData>({ hooks });
const node = mock<INode>();
beforeEach(() => {
hooks.executeHookFunctions.mockClear();
hooks.executeHook.mockClear();
});
test('should rethrow an error with `status` property', async () => {
@ -422,10 +422,7 @@ describe('NodeExecuteFunctions', () => {
test('should not throw if the response status is 200', async () => {
nock(baseUrl).get('/test').reply(200);
await proxyRequestToAxios(workflow, additionalData, node, `${baseUrl}/test`);
expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [
workflow.id,
node,
]);
expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]);
});
test('should throw if the response status is 403', async () => {
@ -445,7 +442,7 @@ describe('NodeExecuteFunctions', () => {
expect(error.config).toBeUndefined();
expect(error.message).toEqual('403 - "Forbidden"');
}
expect(hooks.executeHookFunctions).not.toHaveBeenCalled();
expect(hooks.executeHook).not.toHaveBeenCalled();
});
test('should not throw if the response status is 404, but `simple` option is set to `false`', async () => {
@ -456,10 +453,7 @@ describe('NodeExecuteFunctions', () => {
});
expect(response).toEqual('Not Found');
expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [
workflow.id,
node,
]);
expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]);
});
test('should return full response when `resolveWithFullResponse` is set to true', async () => {
@ -476,10 +470,7 @@ describe('NodeExecuteFunctions', () => {
statusCode: 404,
statusMessage: null,
});
expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [
workflow.id,
node,
]);
expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]);
});
describe('redirects', () => {

View file

@ -9,10 +9,10 @@ import type {
INodeType,
INodeTypes,
ITriggerFunctions,
WorkflowHooks,
IRun,
} from 'n8n-workflow';
import { ExecutionHooks } from '@/execution-hooks';
import { TriggersAndPollers } from '@/TriggersAndPollers';
describe('TriggersAndPollers', () => {
@ -23,15 +23,8 @@ describe('TriggersAndPollers', () => {
});
const nodeTypes = mock<INodeTypes>();
const workflow = mock<Workflow>({ nodeTypes });
const hookFunctions = mock<WorkflowHooks['hookFunctions']>({
sendResponse: [],
workflowExecuteAfter: [],
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
hooks: {
hookFunctions,
},
});
const hooks = new ExecutionHooks('internal', '123', mock());
const additionalData = mock<IWorkflowExecuteAdditionalData>({ hooks });
const triggersAndPollers = new TriggersAndPollers();
beforeEach(() => {
@ -98,8 +91,7 @@ describe('TriggersAndPollers', () => {
getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise);
expect(hookFunctions.sendResponse?.length).toBe(1);
await hookFunctions.sendResponse![0]?.({ testResponse: true });
await hooks.executeHook('sendResponse', [{ testResponse: true }]);
expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true });
});
@ -111,10 +103,10 @@ describe('TriggersAndPollers', () => {
await runTriggerHelper('manual');
getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise, donePromise);
await hookFunctions.sendResponse![0]?.({ testResponse: true });
await hooks.executeHook('sendResponse', [{ testResponse: true }]);
expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true });
await hookFunctions.workflowExecuteAfter?.[0]?.(mockRunData, {});
await hooks.executeHook('workflowExecuteAfter', [mockRunData, {}]);
expect(donePromise.resolve).toHaveBeenCalledWith(mockRunData);
});
});

View file

@ -6,7 +6,6 @@ import type {
INodeType,
INodeTypes,
IRun,
ITaskData,
IVersionedNodeType,
IWorkflowBase,
IWorkflowExecuteAdditionalData,
@ -14,10 +13,11 @@ import type {
WorkflowTestData,
INodeTypeData,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WorkflowHooks } from 'n8n-workflow';
import { ApplicationError, NodeHelpers } from 'n8n-workflow';
import path from 'path';
import { UnrecognizedNodeTypeError } from '@/errors';
import { ExecutionHooks } from '@/execution-hooks';
import { predefinedNodesTypes } from './constants';
@ -53,22 +53,14 @@ export function WorkflowExecuteAdditionalData(
waitPromise: IDeferredPromise<IRun>,
nodeExecutionOrder: string[],
): IWorkflowExecuteAdditionalData {
const hookFunctions = {
nodeExecuteAfter: [
async (nodeName: string, _data: ITaskData): Promise<void> => {
nodeExecutionOrder.push(nodeName);
},
],
workflowExecuteAfter: [
async (fullRunData: IRun): Promise<void> => {
waitPromise.resolve(fullRunData);
},
],
};
return mock<IWorkflowExecuteAdditionalData>({
hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()),
const hooks = new ExecutionHooks('trigger', '1', mock());
hooks.addHook('nodeExecuteAfter', async (nodeName): Promise<void> => {
nodeExecutionOrder.push(nodeName);
});
hooks.addHook('workflowExecuteAfter', async (fullRunData): Promise<void> => {
waitPromise.resolve(fullRunData);
});
return mock<IWorkflowExecuteAdditionalData>({ hooks });
}
const preparePinData = (pinData: IDataObject) => {

View file

@ -7,6 +7,7 @@ import {
Credentials,
UnrecognizedNodeTypeError,
constructExecutionMetaData,
ExecutionHooks,
} from 'n8n-core';
import type {
CredentialLoadingDetails,
@ -27,14 +28,13 @@ import type {
INodeTypeData,
INodeTypes,
IRun,
ITaskData,
IVersionedNodeType,
IWorkflowBase,
IWorkflowExecuteAdditionalData,
NodeLoadingDetails,
WorkflowTestData,
} from 'n8n-workflow';
import { ApplicationError, ICredentialsHelper, NodeHelpers, WorkflowHooks } from 'n8n-workflow';
import { ApplicationError, ICredentialsHelper, NodeHelpers } from 'n8n-workflow';
import nock from 'nock';
import { tmpdir } from 'os';
import path from 'path';
@ -156,22 +156,17 @@ export function WorkflowExecuteAdditionalData(
waitPromise: IDeferredPromise<IRun>,
nodeExecutionOrder: string[],
): IWorkflowExecuteAdditionalData {
const hookFunctions = {
nodeExecuteAfter: [
async (nodeName: string, _data: ITaskData): Promise<void> => {
nodeExecutionOrder.push(nodeName);
},
],
workflowExecuteAfter: [
async (fullRunData: IRun): Promise<void> => {
waitPromise.resolve(fullRunData);
},
],
};
const hooks = new ExecutionHooks('trigger', '1', mock());
hooks.addHook('nodeExecuteAfter', async (nodeName) => {
nodeExecutionOrder.push(nodeName);
});
hooks.addHook('workflowExecuteAfter', async (fullRunData) => {
waitPromise.resolve(fullRunData);
});
return mock<IWorkflowExecuteAdditionalData>({
credentialsHelper: new CredentialsHelper(),
hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()),
hooks,
// Get from node.parameters
currentNodeParameters: undefined,
});

View file

@ -3,7 +3,8 @@ import { mock } from 'jest-mock-extended';
import get from 'lodash/get';
import merge from 'lodash/merge';
import set from 'lodash/set';
import { PollContext, returnJsonArray, type InstanceSettings } from 'n8n-core';
import { PollContext, returnJsonArray } from 'n8n-core';
import type { InstanceSettings, ExecutionHooks } from 'n8n-core';
import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager';
import type {
IBinaryData,
@ -19,7 +20,6 @@ import type {
NodeTypeAndVersion,
VersionedNodeType,
Workflow,
WorkflowHooks,
} from 'n8n-workflow';
type MockDeepPartial<T> = Parameters<typeof mock<T>>[0];
@ -212,7 +212,7 @@ export async function testPollingTriggerNode(
return options as IHttpRequestOptions;
},
}),
hooks: mock<WorkflowHooks>(),
hooks: mock<ExecutionHooks>(),
}),
mode,
'init',

View file

@ -24,7 +24,6 @@ import type { ExecutionStatus } from './ExecutionStatus';
import type { Result } from './result';
import type { Workflow } from './Workflow';
import type { EnvProviderState } from './WorkflowDataProxyEnvProvider';
import type { WorkflowHooks } from './WorkflowHooks';
export interface IAdditionalCredentialOptions {
oauth2?: IOAuth2Options;
@ -900,6 +899,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
// TODO: Create later own type only for Config-Nodes
export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
BaseExecutionFunctions & {
/** This executes sub-workflows */
executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
@ -2241,17 +2241,6 @@ export interface IWorkflowCredentials {
};
}
export interface IWorkflowExecuteHooks {
[key: string]: Array<(...args: any[]) => Promise<void>> | undefined;
nodeExecuteAfter?: Array<
(nodeName: string, data: ITaskData, executionData: IRunExecutionData) => Promise<void>
>;
nodeExecuteBefore?: Array<(nodeName: string) => Promise<void>>;
workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise<void>>;
workflowExecuteBefore?: Array<(workflow?: Workflow, data?: IRunExecutionData) => Promise<void>>;
sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise<void>>;
}
export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
@ -2324,14 +2313,13 @@ type AiEventPayload = {
export interface IWorkflowExecuteAdditionalData {
credentialsHelper: ICredentialsHelper;
executeWorkflow: (
executeSubWorkflow: (
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
) => Promise<ExecuteWorkflowData>;
executionId?: string;
restartExecutionId?: string;
hooks?: WorkflowHooks;
httpResponse?: express.Response;
httpRequest?: express.Request;
restApiUrl: string;
@ -2388,11 +2376,6 @@ export type WorkflowActivateMode =
| 'manual' // unused
| 'leadershipChange';
export interface IWorkflowHooksOptionalParameters {
retryOf?: string;
pushRef?: string;
}
export namespace WorkflowSettings {
export type CallerPolicy = 'any' | 'none' | 'workflowsFromAList' | 'workflowsFromSameOwner';
export type SaveDataExecution = 'DEFAULT' | 'all' | 'none';

View file

@ -1,49 +0,0 @@
import type {
IWorkflowBase,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
WorkflowExecuteMode,
} from './Interfaces';
export class WorkflowHooks {
mode: WorkflowExecuteMode;
workflowData: IWorkflowBase;
executionId: string;
pushRef?: string;
retryOf?: string;
hookFunctions: IWorkflowExecuteHooks;
constructor(
hookFunctions: IWorkflowExecuteHooks,
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
) {
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
optionalParameters = optionalParameters || {};
this.hookFunctions = hookFunctions;
this.mode = mode;
this.executionId = executionId;
this.workflowData = workflowData;
this.pushRef = optionalParameters.pushRef;
// retryOf might be `null` from TypeORM
this.retryOf = optionalParameters.retryOf ?? undefined;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async executeHookFunctions(hookName: string, parameters: any[]) {
const hooks = this.hookFunctions[hookName];
if (hooks !== undefined && Array.isArray(hooks)) {
for (const hookFunction of hooks) {
await hookFunction.apply(this, parameters);
}
}
}
}

View file

@ -17,7 +17,6 @@ export * from './NodeHelpers';
export * from './Workflow';
export * from './WorkflowDataProxy';
export * from './WorkflowDataProxyEnvProvider';
export * from './WorkflowHooks';
export * from './VersionedNodeType';
export * from './TypeValidation';
export * from './result';