Compare commits

..

No commits in common. "e9fd2be0b356caa974838b8a3d7ac8a3e6f49b8f" and "047b1d13c0b1ef9600130253fcd97635597291c2" have entirely different histories.

2 changed files with 40 additions and 15 deletions

View file

@ -5,7 +5,12 @@ import type {
ExecutionStatus,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow';
import {
createDeferredPromise,
ErrorReporterProxy,
ExecutionCancelledError,
sleep,
} from 'n8n-workflow';
import { strict as assert } from 'node:assert';
import type PCancelable from 'p-cancelable';
import { Service } from 'typedi';
@ -122,14 +127,20 @@ export class ActiveExecutions {
*/
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
this.getExecution(executionId).workflowExecution = workflowExecution;
const execution = this.getExecution(executionId);
if (execution) {
execution.workflowExecution = workflowExecution;
}
}
attachResponsePromise(
executionId: string,
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
): void {
this.getExecution(executionId).responsePromise = responsePromise;
const execution = this.getExecution(executionId);
if (execution) {
execution.responsePromise = responsePromise;
}
}
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
@ -140,23 +151,31 @@ export class ActiveExecutions {
/** Cancel the execution promise and reject its post-execution promise. */
stopExecution(executionId: string): void {
const execution = this.getExecution(executionId);
execution.workflowExecution?.cancel();
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
this.logger.debug('Execution cancelled', { executionId });
if (execution) {
execution.workflowExecution?.cancel();
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
this.logger.debug('Execution cancelled', { executionId });
}
}
/** Resolve the post-execution promise in an execution. */
finalizeExecution(executionId: string, fullRunData?: IRun) {
const execution = this.getExecution(executionId);
execution.postExecutePromise.resolve(fullRunData);
this.logger.debug('Execution finalized', { executionId });
if (execution) {
execution.postExecutePromise.resolve(fullRunData);
this.logger.debug('Execution finalized', { executionId });
}
}
/**
* Returns a promise which will resolve with the data of the execution with the given id
*/
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
return await this.getExecution(executionId).postExecutePromise.promise;
const execution = this.getExecution(executionId);
if (execution) {
return await execution.postExecutePromise.promise;
}
return undefined;
}
/**
@ -183,11 +202,14 @@ export class ActiveExecutions {
}
setStatus(executionId: string, status: ExecutionStatus) {
this.getExecution(executionId).status = status;
const execution = this.getExecution(executionId);
if (execution) {
execution.status = status;
}
}
getStatus(executionId: string): ExecutionStatus {
return this.getExecution(executionId).status;
getStatus(executionId: string): ExecutionStatus | undefined {
return this.getExecution(executionId)?.status;
}
/** Wait for all active executions to finish */
@ -219,10 +241,10 @@ export class ActiveExecutions {
}
}
private getExecution(executionId: string): IExecutingWorkflowData {
private getExecution(executionId: string): IExecutingWorkflowData | undefined {
const execution = this.activeExecutions[executionId];
if (!execution) {
throw new ExecutionNotFoundError(executionId);
ErrorReporterProxy.error(new ExecutionNotFoundError(executionId));
}
return execution;
}

View file

@ -334,7 +334,10 @@ export class WorkflowRunner {
if (workflowExecution.isCanceled) {
fullRunData.finished = false;
}
fullRunData.status = this.activeExecutions.getStatus(executionId);
const status = this.activeExecutions.getStatus(executionId);
if (status) {
fullRunData.status = status;
}
this.activeExecutions.finalizeExecution(executionId, fullRunData);
})
.catch(