refactor(core): Refactor execution post-execute promises (no-changelog) (#10809)

Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-23 12:08:57 +02:00 committed by GitHub
parent 521bbfeee5
commit 67fb6d6fdd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 65 additions and 94 deletions

View file

@ -94,10 +94,17 @@ describe('ActiveExecutions', () => {
}); });
test('Should remove an existing execution', async () => { test('Should remove an existing execution', async () => {
// ARRANGE
const newExecution = mockExecutionData(); const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution); const executionId = await activeExecutions.add(newExecution);
activeExecutions.remove(executionId);
// ACT
activeExecutions.finalizeExecution(executionId);
// Wait until the next tick to ensure that the post-execution promise has settled
await new Promise(setImmediate);
// ASSERT
expect(activeExecutions.getActiveExecutions().length).toBe(0); expect(activeExecutions.getActiveExecutions().length).toBe(0);
}); });
@ -110,7 +117,7 @@ describe('ActiveExecutions', () => {
setTimeout(res, 100); setTimeout(res, 100);
}); });
const fakeOutput = mockFullRunData(); const fakeOutput = mockFullRunData();
activeExecutions.remove(executionId, fakeOutput); activeExecutions.finalizeExecution(executionId, fakeOutput);
await expect(postExecutePromise).resolves.toEqual(fakeOutput); await expect(postExecutePromise).resolves.toEqual(fakeOutput);
}); });
@ -126,7 +133,7 @@ describe('ActiveExecutions', () => {
const cancellablePromise = mockCancelablePromise(); const cancellablePromise = mockCancelablePromise();
cancellablePromise.cancel = cancelExecution; cancellablePromise.cancel = cancelExecution;
activeExecutions.attachWorkflowExecution(executionId, cancellablePromise); activeExecutions.attachWorkflowExecution(executionId, cancellablePromise);
void activeExecutions.stopExecution(executionId); activeExecutions.stopExecution(executionId);
expect(cancelExecution).toHaveBeenCalledTimes(1); expect(cancelExecution).toHaveBeenCalledTimes(1);
}); });

View file

@ -1,6 +1,7 @@
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow'; import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
import Container from 'typedi'; import Container from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config'; import config from '@/config';
import type { User } from '@/databases/entities/user'; import type { User } from '@/databases/entities/user';
import { Telemetry } from '@/telemetry'; import { Telemetry } from '@/telemetry';
@ -72,6 +73,10 @@ test('processError should process error', async () => {
}, },
workflow, workflow,
); );
await Container.get(ActiveExecutions).add(
{ executionMode: 'webhook', workflowData: workflow },
execution.id,
);
config.set('executions.mode', 'regular'); config.set('executions.mode', 'regular');
await runner.processError( await runner.processError(
new Error('test') as ExecutionError, new Error('test') as ExecutionError,

View file

@ -5,17 +5,13 @@ import type {
ExecutionStatus, ExecutionStatus,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow';
ApplicationError,
createDeferredPromise,
ExecutionCancelledError,
sleep,
} from 'n8n-workflow';
import { strict as assert } from 'node:assert'; import { strict as assert } from 'node:assert';
import type PCancelable from 'p-cancelable'; import type PCancelable from 'p-cancelable';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import type { import type {
ExecutionPayload, ExecutionPayload,
IExecutingWorkflowData, IExecutingWorkflowData,
@ -95,13 +91,29 @@ export class ActiveExecutions {
await this.executionRepository.updateExistingExecution(executionId, execution); await this.executionRepository.updateExistingExecution(executionId, execution);
} }
const postExecutePromise = createDeferredPromise<IRun | undefined>();
this.activeExecutions[executionId] = { this.activeExecutions[executionId] = {
executionData, executionData,
startedAt: new Date(), startedAt: new Date(),
postExecutePromises: [], postExecutePromise,
status: executionStatus, status: executionStatus,
}; };
// Automatically remove execution once the postExecutePromise settles
void postExecutePromise.promise
.catch((error) => {
if (error instanceof ExecutionCancelledError) return;
throw error;
})
.finally(() => {
this.concurrencyControl.release({ mode: executionData.executionMode });
delete this.activeExecutions[executionId];
this.logger.debug('Execution removed', { executionId });
});
this.logger.debug('Execution added', { executionId });
return executionId; return executionId;
} }
@ -125,68 +137,30 @@ export class ActiveExecutions {
execution?.responsePromise?.resolve(response); execution?.responsePromise?.resolve(response);
} }
getPostExecutePromiseCount(executionId: string): number { /** Cancel the execution promise and reject its post-execution promise. */
return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0;
}
/**
* Remove an active execution
*/
remove(executionId: string, fullRunData?: IRun): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
return;
}
// Resolve all the waiting promises
for (const promise of execution.postExecutePromises) {
promise.resolve(fullRunData);
}
this.postExecuteCleanup(executionId);
}
/**
* Forces an execution to stop
*/
stopExecution(executionId: string): void { stopExecution(executionId: string): void {
const execution = this.activeExecutions[executionId]; const execution = this.activeExecutions[executionId];
if (execution === undefined) { if (execution === undefined) {
// There is no execution running with that id // There is no execution running with that id
return; return;
} }
execution.workflowExecution?.cancel();
execution.workflowExecution!.cancel(); execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
this.logger.debug('Execution cancelled', { executionId });
// Reject all the waiting promises
const reason = new ExecutionCancelledError(executionId);
for (const promise of execution.postExecutePromises) {
promise.reject(reason);
} }
this.postExecuteCleanup(executionId); /** Resolve the post-execution promise in an execution. */
} finalizeExecution(executionId: string, fullRunData?: IRun) {
const execution = this.getExecution(executionId);
private postExecuteCleanup(executionId: string) { execution.postExecutePromise.resolve(fullRunData);
const execution = this.activeExecutions[executionId]; this.logger.debug('Execution finalized', { executionId });
if (execution === undefined) {
return;
}
// Remove from the list of active executions
delete this.activeExecutions[executionId];
this.concurrencyControl.release({ mode: execution.executionData.executionMode });
} }
/** /**
* Returns a promise which will resolve with the data of the execution with the given id * Returns a promise which will resolve with the data of the execution with the given id
*/ */
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> { async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
// Create the promise which will be resolved when the execution finished return await this.getExecution(executionId).postExecutePromise.promise;
const waitPromise = createDeferredPromise<IRun | undefined>();
this.getExecution(executionId).postExecutePromises.push(waitPromise);
return await waitPromise.promise;
} }
/** /**
@ -252,7 +226,7 @@ export class ActiveExecutions {
private getExecution(executionId: string): IExecutingWorkflowData { private getExecution(executionId: string): IExecutingWorkflowData {
const execution = this.activeExecutions[executionId]; const execution = this.activeExecutions[executionId];
if (!execution) { if (!execution) {
throw new ApplicationError('No active execution found', { extra: { executionId } }); throw new ExecutionNotFoundError(executionId);
} }
return execution; return execution;
} }

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class ExecutionNotFoundError extends ApplicationError {
constructor(executionId: string) {
super('No active execution found', { extra: { executionId } });
}
}

View file

@ -193,7 +193,8 @@ export interface IExecutionsCurrentSummary {
export interface IExecutingWorkflowData { export interface IExecutingWorkflowData {
executionData: IWorkflowExecutionDataProcess; executionData: IWorkflowExecutionDataProcess;
startedAt: Date; startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>; /** This promise rejects when the execution is stopped. When the execution finishes (successfully or not), the promise resolves. */
postExecutePromise: IDeferredPromise<IRun | undefined>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>; responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
workflowExecution?: PCancelable<IRun>; workflowExecution?: PCancelable<IRun>;
status: ExecutionStatus; status: ExecutionStatus;

View file

@ -879,8 +879,7 @@ async function executeWorkflow(
fullExecutionData.workflowId = workflowData.id; fullExecutionData.workflowId = workflowData.id;
} }
// remove execution from active executions activeExecutions.finalizeExecution(executionId, fullRunData);
activeExecutions.remove(executionId, fullRunData);
await executionRepository.updateExistingExecution(executionId, fullExecutionData); await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError( throw objectToError(
@ -906,11 +905,11 @@ async function executeWorkflow(
if (data.finished === true || data.status === 'waiting') { if (data.finished === true || data.status === 'waiting') {
// Workflow did finish successfully // Workflow did finish successfully
activeExecutions.remove(executionId, data); activeExecutions.finalizeExecution(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main; return returnData!.data!.main;
} }
activeExecutions.remove(executionId, data); activeExecutions.finalizeExecution(executionId, data);
// Workflow did fail // Workflow did fail
const { error } = data.data.resultData; const { error } = data.data.resultData;

View file

@ -26,7 +26,6 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config'; import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks'; import { ExternalHooks } from '@/external-hooks';
import type { IExecutionResponse } from '@/interfaces';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
import type { ScalingService } from '@/scaling/scaling.service'; import type { ScalingService } from '@/scaling/scaling.service';
@ -102,7 +101,7 @@ export class WorkflowRunner {
// Remove from active execution with empty data. That will // Remove from active execution with empty data. That will
// set the execution to failed. // set the execution to failed.
this.activeExecutions.remove(executionId, fullRunData); this.activeExecutions.finalizeExecution(executionId, fullRunData);
if (hooks) { if (hooks) {
await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]); await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]);
@ -132,7 +131,7 @@ export class WorkflowRunner {
await workflowHooks.executeHookFunctions('workflowExecuteBefore', []); await workflowHooks.executeHookFunctions('workflowExecuteBefore', []);
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
responsePromise?.reject(error); responsePromise?.reject(error);
this.activeExecutions.remove(executionId); this.activeExecutions.finalizeExecution(executionId);
return executionId; return executionId;
} }
@ -336,7 +335,7 @@ export class WorkflowRunner {
fullRunData.finished = false; fullRunData.finished = false;
} }
fullRunData.status = this.activeExecutions.getStatus(executionId); fullRunData.status = this.activeExecutions.getStatus(executionId);
this.activeExecutions.remove(executionId, fullRunData); this.activeExecutions.finalizeExecution(executionId, fullRunData);
}) })
.catch( .catch(
async (error) => async (error) =>
@ -505,45 +504,24 @@ export class WorkflowRunner {
reject(error); reject(error);
} }
// optimization: only pull and unflatten execution data from the Db when it is needed
const executionHasPostExecutionPromises =
this.activeExecutions.getPostExecutePromiseCount(executionId) > 0;
if (executionHasPostExecutionPromises) {
this.logger.debug(
`Reading execution data for execution ${executionId} from db for PostExecutionPromise.`,
);
} else {
this.logger.debug(
`Skipping execution data for execution ${executionId} since there are no PostExecutionPromise.`,
);
}
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: executionHasPostExecutionPromises, includeData: true,
unflattenData: executionHasPostExecutionPromises, unflattenData: true,
}); });
if (!fullExecutionData) { if (!fullExecutionData) {
return reject(new Error(`Could not find execution with id "${executionId}"`)); return reject(new Error(`Could not find execution with id "${executionId}"`));
} }
const runData: IRun = { const runData: IRun = {
data: {},
finished: fullExecutionData.finished, finished: fullExecutionData.finished,
mode: fullExecutionData.mode, mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt, startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt, stoppedAt: fullExecutionData.stoppedAt,
status: fullExecutionData.status, status: fullExecutionData.status,
} as IRun; data: fullExecutionData.data,
};
if (executionHasPostExecutionPromises) { this.activeExecutions.finalizeExecution(executionId, runData);
runData.data = (fullExecutionData as IExecutionResponse).data;
}
// NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present,
// the execution data in runData.data MAY not be available here.
// This means that any function expecting with runData has to check if the runData.data defined from this point
this.activeExecutions.remove(executionId, runData);
// Normally also static data should be supplied here but as it only used for sending // Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed. // data to editor-UI is not needed.