address PR comments

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-18 17:10:21 +02:00
parent c3a4bf59e2
commit 52368f2e8e
No known key found for this signature in database
GPG key ID: 9300FF7CDEA1FBAA
6 changed files with 29 additions and 25 deletions

View file

@ -99,7 +99,7 @@ describe('ActiveExecutions', () => {
const executionId = await activeExecutions.add(newExecution); const executionId = await activeExecutions.add(newExecution);
// ACT // ACT
activeExecutions.finishExecution(executionId); activeExecutions.finalizeExecution(executionId);
// Wait until the next tick to ensure that the post-execution promise has settled // Wait until the next tick to ensure that the post-execution promise has settled
await new Promise(setImmediate); await new Promise(setImmediate);
@ -117,7 +117,7 @@ describe('ActiveExecutions', () => {
setTimeout(res, 100); setTimeout(res, 100);
}); });
const fakeOutput = mockFullRunData(); const fakeOutput = mockFullRunData();
activeExecutions.finishExecution(executionId, fakeOutput); activeExecutions.finalizeExecution(executionId, fakeOutput);
await expect(postExecutePromise).resolves.toEqual(fakeOutput); await expect(postExecutePromise).resolves.toEqual(fakeOutput);
}); });

View file

@ -5,17 +5,13 @@ import type {
ExecutionStatus, ExecutionStatus,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow';
ApplicationError,
createDeferredPromise,
ExecutionCancelledError,
sleep,
} from 'n8n-workflow';
import { strict as assert } from 'node:assert'; import { strict as assert } from 'node:assert';
import type PCancelable from 'p-cancelable'; import type PCancelable from 'p-cancelable';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import type { import type {
ExecutionPayload, ExecutionPayload,
IExecutingWorkflowData, IExecutingWorkflowData,
@ -107,14 +103,16 @@ export class ActiveExecutions {
// Automatically remove execution once the postExecutePromise settles // Automatically remove execution once the postExecutePromise settles
void postExecutePromise.promise void postExecutePromise.promise
.catch((error) => { .catch((error) => {
// rethrow the error unless it's ExecutionCancelledError if (error instanceof ExecutionCancelledError) return;
if (!(error instanceof ExecutionCancelledError)) throw error; throw error;
}) })
.finally(() => { .finally(() => {
this.concurrencyControl.release({ mode: executionData.executionMode }); this.concurrencyControl.release({ mode: executionData.executionMode });
delete this.activeExecutions[executionId]; delete this.activeExecutions[executionId];
}); });
this.logger.debug('Execution added', { executionId });
return executionId; return executionId;
} }
@ -138,17 +136,19 @@ export class ActiveExecutions {
execution?.responsePromise?.resolve(response); execution?.responsePromise?.resolve(response);
} }
/** Forces an execution to stop */ /** Cancel the execution promise and reject its post-execution promise. */
stopExecution(executionId: string): void { stopExecution(executionId: string): void {
const execution = this.getExecution(executionId); const execution = this.getExecution(executionId);
execution.workflowExecution?.cancel(); execution.workflowExecution?.cancel();
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId)); execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
this.logger.debug('Execution cancelled', { executionId });
} }
/** Mark an execution as completed */ /** Resolve the post-execution promise in an execution. */
finishExecution(executionId: string, fullRunData?: IRun) { finalizeExecution(executionId: string, fullRunData?: IRun) {
const execution = this.getExecution(executionId); const execution = this.getExecution(executionId);
execution.postExecutePromise.resolve(fullRunData); execution.postExecutePromise.resolve(fullRunData);
this.logger.debug('Execution finalized', { executionId });
} }
/** /**
@ -221,7 +221,7 @@ export class ActiveExecutions {
private getExecution(executionId: string): IExecutingWorkflowData { private getExecution(executionId: string): IExecutingWorkflowData {
const execution = this.activeExecutions[executionId]; const execution = this.activeExecutions[executionId];
if (!execution) { if (!execution) {
throw new ApplicationError('No active execution found', { extra: { executionId } }); throw new ExecutionNotFoundError(executionId);
} }
return execution; return execution;
} }

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class ExecutionNotFoundError extends ApplicationError {
constructor(executionId: string) {
super('No active execution found', { extra: { executionId } });
}
}

View file

@ -193,6 +193,7 @@ export interface IExecutionsCurrentSummary {
export interface IExecutingWorkflowData { export interface IExecutingWorkflowData {
executionData: IWorkflowExecutionDataProcess; executionData: IWorkflowExecutionDataProcess;
startedAt: Date; startedAt: Date;
/** This promise rejects when the execution is stopped. When the execution finishes (successfully or not), the promise resolves. */
postExecutePromise: IDeferredPromise<IRun | undefined>; postExecutePromise: IDeferredPromise<IRun | undefined>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>; responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
workflowExecution?: PCancelable<IRun>; workflowExecution?: PCancelable<IRun>;

View file

@ -879,8 +879,7 @@ async function executeWorkflow(
fullExecutionData.workflowId = workflowData.id; fullExecutionData.workflowId = workflowData.id;
} }
// remove execution from active executions activeExecutions.finalizeExecution(executionId, fullRunData);
activeExecutions.finishExecution(executionId, fullRunData);
await executionRepository.updateExistingExecution(executionId, fullExecutionData); await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError( throw objectToError(
@ -906,11 +905,11 @@ async function executeWorkflow(
if (data.finished === true || data.status === 'waiting') { if (data.finished === true || data.status === 'waiting') {
// Workflow did finish successfully // Workflow did finish successfully
activeExecutions.finishExecution(executionId, data); activeExecutions.finalizeExecution(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main; return returnData!.data!.main;
} }
activeExecutions.finishExecution(executionId, data); activeExecutions.finalizeExecution(executionId, data);
// Workflow did fail // Workflow did fail
const { error } = data.data.resultData; const { error } = data.data.resultData;

View file

@ -101,7 +101,7 @@ export class WorkflowRunner {
// Remove from active execution with empty data. That will // Remove from active execution with empty data. That will
// set the execution to failed. // set the execution to failed.
this.activeExecutions.finishExecution(executionId, fullRunData); this.activeExecutions.finalizeExecution(executionId, fullRunData);
if (hooks) { if (hooks) {
await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]); await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]);
@ -131,7 +131,7 @@ export class WorkflowRunner {
await workflowHooks.executeHookFunctions('workflowExecuteBefore', []); await workflowHooks.executeHookFunctions('workflowExecuteBefore', []);
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
responsePromise?.reject(error); responsePromise?.reject(error);
this.activeExecutions.finishExecution(executionId); this.activeExecutions.finalizeExecution(executionId);
return executionId; return executionId;
} }
@ -335,7 +335,7 @@ export class WorkflowRunner {
fullRunData.finished = false; fullRunData.finished = false;
} }
fullRunData.status = this.activeExecutions.getStatus(executionId); fullRunData.status = this.activeExecutions.getStatus(executionId);
this.activeExecutions.finishExecution(executionId, fullRunData); this.activeExecutions.finalizeExecution(executionId, fullRunData);
}) })
.catch( .catch(
async (error) => async (error) =>
@ -521,10 +521,7 @@ export class WorkflowRunner {
data: fullExecutionData.data, data: fullExecutionData.data,
}; };
// NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present, this.activeExecutions.finalizeExecution(executionId, runData);
// the execution data in runData.data MAY not be available here.
// This means that any function expecting with runData has to check if the runData.data defined from this point
this.activeExecutions.finishExecution(executionId, runData);
// Normally also static data should be supplied here but as it only used for sending // Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed. // data to editor-UI is not needed.