diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index b474d0bbd6..5692bb92ce 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -613,7 +613,7 @@ export class ActiveWorkflowRunner { /** * Return poll function which gets the global functions from n8n-core - * and overwrites the __emit to be able to start it in subprocess + * and overwrites the emit to be able to start it in subprocess * */ getExecutePollFunctions( @@ -630,19 +630,38 @@ export class ActiveWorkflowRunner { mode, activation, ); - // eslint-disable-next-line no-underscore-dangle - returnFunctions.__emit = async ( - data: INodeExecutionData[][] | ExecutionError, - ): Promise => { - if (data instanceof Error) { - await createErrorExecution(data, node, workflowData, workflow, mode); - this.executeErrorWorkflow(data, workflowData, mode); - return; - } + returnFunctions.__emit = ( + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, + ): void => { // eslint-disable-next-line @typescript-eslint/restrict-template-expressions Logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); WorkflowHelpers.saveStaticData(workflow); - this.runWorkflow(workflowData, node, data, additionalData, mode); + const executePromise = this.runWorkflow( + workflowData, + node, + data, + additionalData, + mode, + responsePromise, + ); + + if (donePromise) { + executePromise.then((executionId) => { + activeExecutions + .getPostExecutePromise(executionId) + .then(donePromise.resolve) + .catch(donePromise.reject); + }); + } else { + executePromise.catch(console.error); + } + }; + + returnFunctions.__emitError = async (error: ExecutionError): Promise => { + await createErrorExecution(error, node, workflowData, workflow, mode); + this.executeErrorWorkflow(error, workflowData, mode); }; return returnFunctions; }; diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index c37eeba1cd..344ada4067 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -160,7 +160,6 @@ export class ActiveWorkflows { const pollResponse = await workflow.runPoll(node, pollFunctions); if (pollResponse !== null) { - // eslint-disable-next-line no-underscore-dangle pollFunctions.__emit(pollResponse); } } catch (error) { @@ -170,8 +169,7 @@ export class ActiveWorkflows { if (testingTrigger) { throw error; } - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, no-underscore-dangle - pollFunctions.__emit(error); + pollFunctions.__emitError(error); } }; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 78ec4ebc24..f972954a19 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -1874,7 +1874,12 @@ export function getExecutePollFunctions( return ((workflow: Workflow, node: INode) => { return { __emit: (data: INodeExecutionData[][]): void => { - throw new Error('Overwrite NodeExecuteFunctions.getExecutePullFunctions.__emit function!'); + throw new Error('Overwrite NodeExecuteFunctions.getExecutePollFunctions.__emit function!'); + }, + __emitError(error: Error) { + throw new Error( + 'Overwrite NodeExecuteFunctions.getExecutePollFunctions.__emitError function!', + ); }, async getCredentials(type: string): Promise { return getCredentials(workflow, node, type, additionalData, mode); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 0ff80ad866..b93d34bbf8 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -720,7 +720,12 @@ export interface IHookFunctions { } export interface IPollFunctions { - __emit(data: INodeExecutionData[][] | NodeApiError): void; + __emit( + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, + ): void; + __emitError(error: Error, responsePromise?: IDeferredPromise): void; getCredentials(type: string): Promise; getMode(): WorkflowExecuteMode; getActivationMode(): WorkflowActivateMode;