stop passing pushRef and retryOf around

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2025-01-30 14:30:52 +01:00
parent 50b45cff3a
commit e833b1b1a3
No known key found for this signature in database
6 changed files with 60 additions and 86 deletions

View file

@ -201,8 +201,6 @@ describe('Execution Lifecycle Hooks', () => {
expect(hooks.mode).toBe('manual'); expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId); expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData); expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toEqual('test-push-ref');
expect(hooks.retryOf).toEqual('test-retry-of');
const { hookFunctions } = hooks; const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); expect(hookFunctions.nodeExecuteBefore).toHaveLength(2);
@ -260,7 +258,7 @@ describe('Execution Lifecycle Hooks', () => {
workflowData.settings = { saveExecutionProgress: false }; workflowData.settings = { saveExecutionProgress: false };
hooks = createHooks(); hooks = createHooks();
expect(hooks.hookFunctions.nodeExecuteAfter).toHaveLength(3); expect(hooks.hookFunctions.nodeExecuteAfter).toHaveLength(2);
await hooks.executeHookFunctions('nodeExecuteAfter', [ await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName, nodeName,
@ -509,10 +507,16 @@ describe('Execution Lifecycle Hooks', () => {
describe("when pushRef isn't set", () => { describe("when pushRef isn't set", () => {
beforeEach(() => { beforeEach(() => {
hooks = getWorkflowHooksMain({ executionMode, workflowData }, executionId); hooks = getWorkflowHooksMain({ executionMode, workflowData, retryOf }, executionId);
}); });
it('should not send any push events', async () => { it('should not setup any push hooks', async () => {
const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(1);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(1);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(2);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(4);
await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]);
await hooks.executeHookFunctions('nodeExecuteAfter', [ await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName, nodeName,
@ -543,8 +547,6 @@ describe('Execution Lifecycle Hooks', () => {
expect(hooks.mode).toBe('manual'); expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId); expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData); expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toEqual('test-push-ref');
expect(hooks.retryOf).toEqual('test-retry-of');
const { hookFunctions } = hooks; const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(0); expect(hookFunctions.nodeExecuteBefore).toHaveLength(0);
@ -621,8 +623,6 @@ describe('Execution Lifecycle Hooks', () => {
expect(hooks.mode).toBe('manual'); expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId); expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData); expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toEqual('test-push-ref');
expect(hooks.retryOf).toEqual('test-retry-of');
const { hookFunctions } = hooks; const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); expect(hookFunctions.nodeExecuteBefore).toHaveLength(2);
@ -718,8 +718,6 @@ describe('Execution Lifecycle Hooks', () => {
expect(hooks.mode).toBe('manual'); expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId); expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData); expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toBeUndefined();
expect(hooks.retryOf).toBeUndefined();
const { hookFunctions } = hooks; const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(1); expect(hookFunctions.nodeExecuteBefore).toHaveLength(1);

View file

@ -16,13 +16,12 @@ afterEach(() => {
jest.clearAllMocks(); jest.clearAllMocks();
}); });
const commonArgs: [string, string, string, ITaskData, IRunExecutionData, string] = [ const commonArgs: [string, string, string, ITaskData, IRunExecutionData] = [
'some-workflow-id', 'some-workflow-id',
'some-execution-id', 'some-execution-id',
'My Node', 'My Node',
{} as ITaskData, {} as ITaskData,
{} as IRunExecutionData, {} as IRunExecutionData,
'some-session-id',
]; ];
test('should ignore on leftover async call', async () => { test('should ignore on leftover async call', async () => {

View file

@ -10,7 +10,6 @@ import type {
ITaskData, ITaskData,
IWorkflowBase, IWorkflowBase,
IWorkflowExecuteHooks, IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
WorkflowExecuteMode, WorkflowExecuteMode,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
Workflow, Workflow,
@ -34,6 +33,12 @@ import {
} from './shared/shared-hook-functions'; } from './shared/shared-hook-functions';
import { type ExecutionSavingSettings, toSaveSettings } from './to-save-settings'; import { type ExecutionSavingSettings, toSaveSettings } from './to-save-settings';
type HooksSetupParameters = {
saveSettings: ExecutionSavingSettings;
pushRef?: string;
retryOf?: string;
};
function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks { function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks {
const result: IWorkflowExecuteHooks = { const result: IWorkflowExecuteHooks = {
nodeExecuteBefore: [], nodeExecuteBefore: [],
@ -91,19 +96,16 @@ function hookFunctionsNodeEvents(): IWorkflowExecuteHooks {
/** /**
* Returns hook functions to push data to Editor-UI * Returns hook functions to push data to Editor-UI
*/ */
function hookFunctionsPush(): IWorkflowExecuteHooks { function hookFunctionsPush({ pushRef, retryOf }: HooksSetupParameters): IWorkflowExecuteHooks {
if (!pushRef) return {};
const logger = Container.get(Logger); const logger = Container.get(Logger);
const pushInstance = Container.get(Push); const pushInstance = Container.get(Push);
return { return {
nodeExecuteBefore: [ nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> { async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { pushRef, executionId } = this; const { executionId } = this;
// Push data to session which started workflow before each // Push data to session which started workflow before each
// node which starts rendering // node which starts rendering
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId, executionId,
pushRef, pushRef,
@ -115,12 +117,8 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
], ],
nodeExecuteAfter: [ nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> { async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
const { pushRef, executionId } = this; const { executionId } = this;
// Push data to session which started workflow after each rendered node // Push data to session which started workflow after each rendered node
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId, executionId,
pushRef, pushRef,
@ -135,7 +133,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
], ],
workflowExecuteBefore: [ workflowExecuteBefore: [
async function (this: WorkflowHooks, _workflow, data): Promise<void> { async function (this: WorkflowHooks, _workflow, data): Promise<void> {
const { pushRef, executionId } = this; const { executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData; const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', { logger.debug('Executing hook (hookFunctionsPush)', {
executionId, executionId,
@ -143,9 +141,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId, workflowId,
}); });
// Push data to session which started the workflow // Push data to session which started the workflow
if (pushRef === undefined) {
return;
}
pushInstance.send( pushInstance.send(
{ {
type: 'executionStarted', type: 'executionStarted',
@ -153,7 +148,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
executionId, executionId,
mode: this.mode, mode: this.mode,
startedAt: new Date(), startedAt: new Date(),
retryOf: this.retryOf, retryOf,
workflowId, workflowId,
workflowName, workflowName,
flattedRunData: data?.resultData.runData flattedRunData: data?.resultData.runData
@ -167,9 +162,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
], ],
workflowExecuteAfter: [ workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> { async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { pushRef, executionId } = this; const { executionId } = this;
if (pushRef === undefined) return;
const { id: workflowId } = this.workflowData; const { id: workflowId } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', { logger.debug('Executing hook (hookFunctionsPush)', {
executionId, executionId,
@ -212,7 +205,7 @@ function hookFunctionsExternalHooks(): IWorkflowExecuteHooks {
}; };
} }
function hookFunctionsSaveProgress(saveSettings: ExecutionSavingSettings): IWorkflowExecuteHooks { function hookFunctionsSaveProgress({ saveSettings }: HooksSetupParameters): IWorkflowExecuteHooks {
if (!saveSettings.progress) return {}; if (!saveSettings.progress) return {};
return { return {
nodeExecuteAfter: [ nodeExecuteAfter: [
@ -228,7 +221,6 @@ function hookFunctionsSaveProgress(saveSettings: ExecutionSavingSettings): IWork
nodeName, nodeName,
data, data,
executionData, executionData,
this.pushRef,
); );
}, },
], ],
@ -249,7 +241,11 @@ function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks {
/** /**
* Returns hook functions to save workflow execution and call error workflow * Returns hook functions to save workflow execution and call error workflow
*/ */
function hookFunctionsSave(saveSettings: ExecutionSavingSettings): IWorkflowExecuteHooks { function hookFunctionsSave({
pushRef,
retryOf,
saveSettings,
}: HooksSetupParameters): IWorkflowExecuteHooks {
const logger = Container.get(Logger); const logger = Container.get(Logger);
const errorReporter = Container.get(ErrorReporter); const errorReporter = Container.get(ErrorReporter);
const executionRepository = Container.get(ExecutionRepository); const executionRepository = Container.get(ExecutionRepository);
@ -314,7 +310,7 @@ function hookFunctionsSave(saveSettings: ExecutionSavingSettings): IWorkflowExec
fullRunData, fullRunData,
this.mode, this.mode,
this.executionId, this.executionId,
this.retryOf, retryOf,
); );
await executionRepository.hardDelete({ await executionRepository.hardDelete({
@ -331,12 +327,12 @@ function hookFunctionsSave(saveSettings: ExecutionSavingSettings): IWorkflowExec
runData: fullRunData, runData: fullRunData,
workflowData: this.workflowData, workflowData: this.workflowData,
workflowStatusFinal: fullRunData.status, workflowStatusFinal: fullRunData.status,
retryOf: this.retryOf, retryOf,
}); });
// When going into the waiting state, store the pushRef in the execution-data // When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) { if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef; fullExecutionData.data.pushRef = pushRef;
} }
await updateExistingExecution({ await updateExistingExecution({
@ -351,7 +347,7 @@ function hookFunctionsSave(saveSettings: ExecutionSavingSettings): IWorkflowExec
fullRunData, fullRunData,
this.mode, this.mode,
this.executionId, this.executionId,
this.retryOf, retryOf,
); );
} }
} finally { } finally {
@ -375,7 +371,10 @@ function hookFunctionsSave(saveSettings: ExecutionSavingSettings): IWorkflowExec
* for running with queues. Manual executions should never run on queues as * for running with queues. Manual executions should never run on queues as
* they are always executed in the main process. * they are always executed in the main process.
*/ */
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { function hookFunctionsSaveWorker({
pushRef,
retryOf,
}: HooksSetupParameters): IWorkflowExecuteHooks {
const logger = Container.get(Logger); const logger = Container.get(Logger);
const errorReporter = Container.get(ErrorReporter); const errorReporter = Container.get(ErrorReporter);
const workflowStaticDataService = Container.get(WorkflowStaticDataService); const workflowStaticDataService = Container.get(WorkflowStaticDataService);
@ -407,7 +406,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
logger.error( logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`,
{ pushRef: this.pushRef, workflowId: this.workflowData.id }, { workflowId: this.workflowData.id },
); );
} }
} }
@ -422,7 +421,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
fullRunData, fullRunData,
this.mode, this.mode,
this.executionId, this.executionId,
this.retryOf, retryOf,
); );
} }
@ -432,12 +431,12 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
runData: fullRunData, runData: fullRunData,
workflowData: this.workflowData, workflowData: this.workflowData,
workflowStatusFinal: fullRunData.status, workflowStatusFinal: fullRunData.status,
retryOf: this.retryOf, retryOf,
}); });
// When going into the waiting state, store the pushRef in the execution-data // When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) { if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef; fullExecutionData.data.pushRef = pushRef;
} }
await updateExistingExecution({ await updateExistingExecution({
@ -476,8 +475,8 @@ export function getWorkflowHooksIntegrated(
hookFunctionsWorkflowEvents(userId), hookFunctionsWorkflowEvents(userId),
hookFunctionsNodeEvents(), hookFunctionsNodeEvents(),
hookFunctionsFinalizeExecutionStatus(), hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSave(saveSettings), hookFunctionsSave({ saveSettings }),
hookFunctionsSaveProgress(saveSettings), hookFunctionsSaveProgress({ saveSettings }),
hookFunctionsExternalHooks(), hookFunctionsExternalHooks(),
); );
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
@ -490,23 +489,24 @@ export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
executionId: string, executionId: string,
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
optionalParameters: IWorkflowHooksOptionalParameters = {}, { pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): WorkflowHooks { ): WorkflowHooks {
const saveSettings = toSaveSettings(workflowData.settings); const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const toMerge = [ const toMerge = [
hookFunctionsNodeEvents(), hookFunctionsNodeEvents(),
hookFunctionsFinalizeExecutionStatus(), hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSaveWorker(), hookFunctionsSaveWorker(optionalParameters),
hookFunctionsSaveProgress(saveSettings), hookFunctionsSaveProgress(optionalParameters),
hookFunctionsExternalHooks(), hookFunctionsExternalHooks(),
]; ];
if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { if (mode === 'manual' && Container.get(InstanceSettings).isWorker) {
toMerge.push(hookFunctionsPush()); toMerge.push(hookFunctionsPush(optionalParameters));
} }
const hookFunctions = mergeHookFunctions(...toMerge); const hookFunctions = mergeHookFunctions(...toMerge);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
} }
/** /**
@ -516,13 +516,14 @@ export function getWorkflowHooksWorkerMain(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
executionId: string, executionId: string,
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
optionalParameters: IWorkflowHooksOptionalParameters = {}, { pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): WorkflowHooks { ): WorkflowHooks {
const saveSettings = toSaveSettings(workflowData.settings); const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const executionRepository = Container.get(ExecutionRepository); const executionRepository = Container.get(ExecutionRepository);
const hookFunctions = mergeHookFunctions( const hookFunctions = mergeHookFunctions(
hookFunctionsWorkflowEvents(), hookFunctionsWorkflowEvents(),
hookFunctionsSaveProgress(saveSettings), hookFunctionsSaveProgress(optionalParameters),
hookFunctionsExternalHooks(), hookFunctionsExternalHooks(),
hookFunctionsFinalizeExecutionStatus(), hookFunctionsFinalizeExecutionStatus(),
{ {
@ -569,7 +570,7 @@ export function getWorkflowHooksWorkerMain(
hookFunctions.nodeExecuteBefore = []; hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = []; hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
} }
/** /**
@ -579,18 +580,17 @@ export function getWorkflowHooksMain(
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
executionId: string, executionId: string,
): WorkflowHooks { ): WorkflowHooks {
const { pushRef, retryOf } = data;
const saveSettings = toSaveSettings(data.workflowData.settings); const saveSettings = toSaveSettings(data.workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const hookFunctions = mergeHookFunctions( const hookFunctions = mergeHookFunctions(
hookFunctionsWorkflowEvents(), hookFunctionsWorkflowEvents(),
hookFunctionsNodeEvents(), hookFunctionsNodeEvents(),
hookFunctionsFinalizeExecutionStatus(), hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSave(saveSettings), hookFunctionsSave(optionalParameters),
hookFunctionsPush(), hookFunctionsPush(optionalParameters),
hookFunctionsSaveProgress(saveSettings), hookFunctionsSaveProgress(optionalParameters),
hookFunctionsExternalHooks(), hookFunctionsExternalHooks(),
); );
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData);
pushRef: data.pushRef,
retryOf: data.retryOf as string,
});
} }

View file

@ -10,7 +10,6 @@ export async function saveExecutionProgress(
nodeName: string, nodeName: string,
data: ITaskData, data: ITaskData,
executionData: IRunExecutionData, executionData: IRunExecutionData,
pushRef?: string,
) { ) {
const logger = Container.get(Logger); const logger = Container.get(Logger);
@ -90,7 +89,6 @@ export async function saveExecutionProgress(
{ {
...error, ...error,
executionId, executionId,
pushRef,
workflowId, workflowId,
}, },
); );

View file

@ -2409,11 +2409,6 @@ export type WorkflowActivateMode =
| 'manual' // unused | 'manual' // unused
| 'leadershipChange'; | 'leadershipChange';
export interface IWorkflowHooksOptionalParameters {
retryOf?: string;
pushRef?: string;
}
export namespace WorkflowSettings { export namespace WorkflowSettings {
export type CallerPolicy = 'any' | 'none' | 'workflowsFromAList' | 'workflowsFromSameOwner'; export type CallerPolicy = 'any' | 'none' | 'workflowsFromAList' | 'workflowsFromSameOwner';
export type SaveDataExecution = 'DEFAULT' | 'all' | 'none'; export type SaveDataExecution = 'DEFAULT' | 'all' | 'none';

View file

@ -1,9 +1,4 @@
import type { import type { IWorkflowBase, IWorkflowExecuteHooks, WorkflowExecuteMode } from './Interfaces';
IWorkflowBase,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
WorkflowExecuteMode,
} from './Interfaces';
export class WorkflowHooks { export class WorkflowHooks {
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
@ -12,10 +7,6 @@ export class WorkflowHooks {
executionId: string; executionId: string;
pushRef?: string;
retryOf?: string;
hookFunctions: IWorkflowExecuteHooks; hookFunctions: IWorkflowExecuteHooks;
constructor( constructor(
@ -23,18 +14,11 @@ export class WorkflowHooks {
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
executionId: string, executionId: string,
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
) { ) {
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
optionalParameters = optionalParameters || {};
this.hookFunctions = hookFunctions; this.hookFunctions = hookFunctions;
this.mode = mode; this.mode = mode;
this.executionId = executionId; this.executionId = executionId;
this.workflowData = workflowData; this.workflowData = workflowData;
this.pushRef = optionalParameters.pushRef;
// retryOf might be `null` from TypeORM
this.retryOf = optionalParameters.retryOf ?? undefined;
} }
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any