fix(core): Ensure that 'workflow-post-execute' event has userId whenever it's available (#13326)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2025-02-18 10:23:57 +01:00 committed by GitHub
parent e3b7243377
commit f41e353887
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 49 additions and 63 deletions

View file

@ -88,6 +88,7 @@ describe('Execution Lifecycle Hooks', () => {
const expressionError = new ExpressionError('Error'); const expressionError = new ExpressionError('Error');
const pushRef = 'test-push-ref'; const pushRef = 'test-push-ref';
const retryOf = 'test-retry-of'; const retryOf = 'test-retry-of';
const userId = 'test-user-id';
const now = new Date('2025-01-13T18:25:50.267Z'); const now = new Date('2025-01-13T18:25:50.267Z');
jest.useFakeTimers({ now }); jest.useFakeTimers({ now });
@ -110,7 +111,7 @@ describe('Execution Lifecycle Hooks', () => {
}; };
}); });
const workflowEventTests = () => { const workflowEventTests = (expectedUserId?: string) => {
describe('workflowExecuteBefore', () => { describe('workflowExecuteBefore', () => {
it('should emit workflow-pre-execute events', async () => { it('should emit workflow-pre-execute events', async () => {
await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]);
@ -130,6 +131,7 @@ describe('Execution Lifecycle Hooks', () => {
executionId, executionId,
runData: successfulRun, runData: successfulRun,
workflow: workflowData, workflow: workflowData,
userId: expectedUserId,
}); });
}); });
@ -214,7 +216,7 @@ describe('Execution Lifecycle Hooks', () => {
describe('getLifecycleHooksForRegularMain', () => { describe('getLifecycleHooksForRegularMain', () => {
const createHooks = (executionMode: WorkflowExecuteMode = 'manual') => const createHooks = (executionMode: WorkflowExecuteMode = 'manual') =>
getLifecycleHooksForRegularMain( getLifecycleHooksForRegularMain(
{ executionMode, workflowData, pushRef, retryOf }, { executionMode, workflowData, pushRef, retryOf, userId },
executionId, executionId,
); );
@ -222,7 +224,7 @@ describe('Execution Lifecycle Hooks', () => {
lifecycleHooks = createHooks(); lifecycleHooks = createHooks();
}); });
workflowEventTests(); workflowEventTests(userId);
nodeEventsTests(); nodeEventsTests();
externalHooksTests(); externalHooksTests();
statisticsTests(); statisticsTests();
@ -527,13 +529,19 @@ describe('Execution Lifecycle Hooks', () => {
describe('getLifecycleHooksForScalingMain', () => { describe('getLifecycleHooksForScalingMain', () => {
beforeEach(() => { beforeEach(() => {
lifecycleHooks = getLifecycleHooksForScalingMain('manual', executionId, workflowData, { lifecycleHooks = getLifecycleHooksForScalingMain(
pushRef, {
retryOf, executionMode: 'manual',
}); workflowData,
pushRef,
retryOf,
userId,
},
executionId,
);
}); });
workflowEventTests(); workflowEventTests(userId);
externalHooksTests(); externalHooksTests();
it('should setup the correct set of hooks', () => { it('should setup the correct set of hooks', () => {
@ -566,13 +574,13 @@ describe('Execution Lifecycle Hooks', () => {
saveDataErrorExecution: 'all', saveDataErrorExecution: 'all',
}; };
const lifecycleHooks = getLifecycleHooksForScalingMain( const lifecycleHooks = getLifecycleHooksForScalingMain(
'webhook',
executionId,
workflowData,
{ {
executionMode: 'webhook',
workflowData,
pushRef, pushRef,
retryOf, retryOf,
}, },
executionId,
); );
await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]);
@ -589,13 +597,13 @@ describe('Execution Lifecycle Hooks', () => {
saveDataErrorExecution: 'none', saveDataErrorExecution: 'none',
}; };
const lifecycleHooks = getLifecycleHooksForScalingMain( const lifecycleHooks = getLifecycleHooksForScalingMain(
'webhook',
executionId,
workflowData,
{ {
executionMode: 'webhook',
workflowData,
pushRef, pushRef,
retryOf, retryOf,
}, },
executionId,
); );
await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]);
@ -610,10 +618,10 @@ describe('Execution Lifecycle Hooks', () => {
describe('getLifecycleHooksForScalingWorker', () => { describe('getLifecycleHooksForScalingWorker', () => {
const createHooks = (executionMode: WorkflowExecuteMode = 'manual') => const createHooks = (executionMode: WorkflowExecuteMode = 'manual') =>
getLifecycleHooksForScalingWorker(executionMode, executionId, workflowData, { getLifecycleHooksForScalingWorker(
pushRef, { executionMode, workflowData, pushRef, retryOf },
retryOf, executionId,
}); );
beforeEach(() => { beforeEach(() => {
lifecycleHooks = createHooks(); lifecycleHooks = createHooks();

View file

@ -376,14 +376,13 @@ export function getLifecycleHooksForSubExecutions(
* Returns ExecutionLifecycleHooks instance for worker in scaling mode. * Returns ExecutionLifecycleHooks instance for worker in scaling mode.
*/ */
export function getLifecycleHooksForScalingWorker( export function getLifecycleHooksForScalingWorker(
mode: WorkflowExecuteMode, data: IWorkflowExecutionDataProcess,
executionId: string, executionId: string,
workflowData: IWorkflowBase,
{ pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): ExecutionLifecycleHooks { ): ExecutionLifecycleHooks {
const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); const { pushRef, retryOf, executionMode, workflowData } = data;
const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData);
const saveSettings = toSaveSettings(workflowData.settings); const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings }; const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
hookFunctionsNodeEvents(hooks); hookFunctionsNodeEvents(hooks);
hookFunctionsFinalizeExecutionStatus(hooks); hookFunctionsFinalizeExecutionStatus(hooks);
hookFunctionsSaveWorker(hooks, optionalParameters); hookFunctionsSaveWorker(hooks, optionalParameters);
@ -391,7 +390,7 @@ export function getLifecycleHooksForScalingWorker(
hookFunctionsStatistics(hooks); hookFunctionsStatistics(hooks);
hookFunctionsExternalHooks(hooks); hookFunctionsExternalHooks(hooks);
if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { if (executionMode === 'manual' && Container.get(InstanceSettings).isWorker) {
hookFunctionsPush(hooks, optionalParameters); hookFunctionsPush(hooks, optionalParameters);
} }
@ -402,17 +401,16 @@ export function getLifecycleHooksForScalingWorker(
* Returns ExecutionLifecycleHooks instance for main process if workflow runs via worker * Returns ExecutionLifecycleHooks instance for main process if workflow runs via worker
*/ */
export function getLifecycleHooksForScalingMain( export function getLifecycleHooksForScalingMain(
mode: WorkflowExecuteMode, data: IWorkflowExecutionDataProcess,
executionId: string, executionId: string,
workflowData: IWorkflowBase,
{ pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): ExecutionLifecycleHooks { ): ExecutionLifecycleHooks {
const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); const { pushRef, retryOf, executionMode, workflowData, userId } = data;
const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData);
const saveSettings = toSaveSettings(workflowData.settings); const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings }; const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
const executionRepository = Container.get(ExecutionRepository); const executionRepository = Container.get(ExecutionRepository);
hookFunctionsWorkflowEvents(hooks); hookFunctionsWorkflowEvents(hooks, userId);
hookFunctionsSaveProgress(hooks, optionalParameters); hookFunctionsSaveProgress(hooks, optionalParameters);
hookFunctionsExternalHooks(hooks); hookFunctionsExternalHooks(hooks);
hookFunctionsFinalizeExecutionStatus(hooks); hookFunctionsFinalizeExecutionStatus(hooks);
@ -466,11 +464,11 @@ export function getLifecycleHooksForRegularMain(
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
executionId: string, executionId: string,
): ExecutionLifecycleHooks { ): ExecutionLifecycleHooks {
const { pushRef, retryOf, executionMode, workflowData } = data; const { pushRef, retryOf, executionMode, workflowData, userId } = data;
const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData); const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData);
const saveSettings = toSaveSettings(workflowData.settings); const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings }; const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
hookFunctionsWorkflowEvents(hooks); hookFunctionsWorkflowEvents(hooks, userId);
hookFunctionsNodeEvents(hooks); hookFunctionsNodeEvents(hooks);
hookFunctionsFinalizeExecutionStatus(hooks); hookFunctionsFinalizeExecutionStatus(hooks);
hookFunctionsSave(hooks, optionalParameters); hookFunctionsSave(hooks, optionalParameters);

View file

@ -132,10 +132,13 @@ export class JobProcessor {
const { pushRef } = job.data; const { pushRef } = job.data;
const lifecycleHooks = getLifecycleHooksForScalingWorker( const lifecycleHooks = getLifecycleHooksForScalingWorker(
execution.mode, {
job.data.executionId, executionMode: execution.mode,
execution.workflowData, workflowData: execution.workflowData,
{ retryOf: execution.retryOf ?? undefined, pushRef }, retryOf: execution.retryOf,
pushRef,
},
executionId,
); );
additionalData.hooks = lifecycleHooks; additionalData.hooks = lifecycleHooks;

View file

@ -345,14 +345,7 @@ export class WorkflowRunner {
try { try {
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
lifecycleHooks = getLifecycleHooksForScalingMain( lifecycleHooks = getLifecycleHooksForScalingMain(data, executionId);
data.executionMode,
executionId,
data.workflowData,
{
retryOf: data.retryOf ?? undefined,
},
);
// Normally also workflow should be supplied here but as it only used for sending // Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed. // data to editor-UI is not needed.
@ -360,12 +353,7 @@ export class WorkflowRunner {
} catch (error) { } catch (error) {
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require. // "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker( const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
data.executionMode,
executionId,
data.workflowData,
{ retryOf: data.retryOf ?? undefined },
);
await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks); await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks);
throw error; throw error;
} }
@ -378,13 +366,7 @@ export class WorkflowRunner {
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require. // "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker( const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
data.executionMode,
executionId,
data.workflowData,
{ retryOf: data.retryOf ?? undefined },
);
const error = new ExecutionCancelledError(executionId); const error = new ExecutionCancelledError(executionId);
await this.processError( await this.processError(
error, error,
@ -409,12 +391,7 @@ export class WorkflowRunner {
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require. // "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker( const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
data.executionMode,
executionId,
data.workflowData,
{ retryOf: data.retryOf ?? undefined },
);
await this.processError( await this.processError(
error, error,