fix(core): Stopping an execution should reject any response promises (#9992)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-07-16 19:25:20 +02:00 committed by GitHub
parent 5e57b0d71e
commit 36b314d031
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 55 additions and 23 deletions

View file

@ -6,7 +6,12 @@ import type {
IRun,
ExecutionStatus,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise, sleep } from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
ExecutionCancelledError,
sleep,
} from 'n8n-workflow';
import type {
ExecutionPayload,
@ -138,16 +143,13 @@ export class ActiveExecutions {
promise.resolve(fullRunData);
}
// Remove from the list of active executions
delete this.activeExecutions[executionId];
this.concurrencyControl.release({ mode: execution.executionData.executionMode });
this.postExecuteCleanup(executionId);
}
/**
* Forces an execution to stop
*/
async stopExecution(executionId: string): Promise<IRun | undefined> {
stopExecution(executionId: string): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
// There is no execution running with that id
@ -156,7 +158,25 @@ export class ActiveExecutions {
execution.workflowExecution!.cancel();
return await this.getPostExecutePromise(executionId);
// Reject all the waiting promises
const reason = new ExecutionCancelledError(executionId);
for (const promise of execution.postExecutePromises) {
promise.reject(reason);
}
this.postExecuteCleanup(executionId);
}
private postExecuteCleanup(executionId: string) {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
return;
}
// Remove from the list of active executions
delete this.activeExecutions[executionId];
this.concurrencyControl.release({ mode: execution.executionData.executionMode });
}
/**
@ -215,11 +235,7 @@ export class ActiveExecutions {
await this.concurrencyControl.removeAll(this.activeExecutions);
}
const stopPromises = executionIds.map(
async (executionId) => await this.stopExecution(executionId),
);
await Promise.allSettled(stopPromises);
executionIds.forEach((executionId) => this.stopExecution(executionId));
}
let count = 0;

View file

@ -86,7 +86,7 @@ export class WaitTracker {
}
}
async stopExecution(executionId: string) {
stopExecution(executionId: string) {
if (!this.waitingExecutions[executionId]) return;
clearTimeout(this.waitingExecutions[executionId].timer);

View file

@ -16,8 +16,8 @@ import type {
} from 'n8n-workflow';
import {
ErrorReporterProxy as ErrorReporter,
ExecutionCancelledError,
Workflow,
WorkflowOperationError,
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
@ -188,6 +188,7 @@ export class WorkflowRunner {
}
})
.catch((error) => {
if (error instanceof ExecutionCancelledError) return;
ErrorReporter.error(error);
this.logger.error(
'There was a problem running internal hook "onWorkflowPostExecute"',
@ -426,7 +427,7 @@ export class WorkflowRunner {
{ retryOf: data.retryOf ? data.retryOf.toString() : undefined },
);
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
const error = new ExecutionCancelledError(executionId);
await this.processError(error, new Date(), data.executionMode, executionId, hooksWorker);
reject(error);

View file

@ -20,14 +20,16 @@ import type {
SelectQueryBuilder,
} from '@n8n/typeorm';
import { parse, stringify } from 'flatted';
import { GlobalConfig } from '@n8n/config';
import {
ApplicationError,
WorkflowOperationError,
type ExecutionStatus,
type ExecutionSummary,
type IRunExecutionData,
} from 'n8n-workflow';
import { BinaryDataService } from 'n8n-core';
import { ExecutionCancelledError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import type {
ExecutionPayload,
IExecutionBase,
@ -43,9 +45,7 @@ import { ExecutionDataRepository } from './executionData.repository';
import { Logger } from '@/Logger';
import type { ExecutionSummaries } from '@/executions/execution.types';
import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error';
import { GlobalConfig } from '@n8n/config';
import { separate } from '@/utils';
import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
@ -641,8 +641,9 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}
async stopDuringRun(execution: IExecutionResponse) {
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
const error = new ExecutionCancelledError(execution.id);
execution.data ??= { resultData: { runData: {} } };
execution.data.resultData.error = {
...error,
message: error.message,

View file

@ -444,11 +444,11 @@ export class ExecutionService {
}
if (this.activeExecutions.has(execution.id)) {
await this.activeExecutions.stopExecution(execution.id);
this.activeExecutions.stopExecution(execution.id);
}
if (this.waitTracker.has(execution.id)) {
await this.waitTracker.stopExecution(execution.id);
this.waitTracker.stopExecution(execution.id);
}
return await this.executionRepository.stopDuringRun(execution);
@ -461,11 +461,11 @@ export class ExecutionService {
}
if (this.activeExecutions.has(execution.id)) {
await this.activeExecutions.stopExecution(execution.id);
this.activeExecutions.stopExecution(execution.id);
}
if (this.waitTracker.has(execution.id)) {
await this.waitTracker.stopExecution(execution.id);
this.waitTracker.stopExecution(execution.id);
}
const job = await this.queue.findRunningJobBy({ executionId: execution.id });

View file

@ -29,6 +29,7 @@ describe('ExecutionService', () => {
mock(),
mock(),
mock(),
mock(),
);
});

View file

@ -19,6 +19,7 @@ import type { Workflow } from './Workflow';
import type { WorkflowActivationError } from './errors/workflow-activation.error';
import type { WorkflowOperationError } from './errors/workflow-operation.error';
import type { WorkflowHooks } from './WorkflowHooks';
import type { ExecutionCancelledError } from './errors';
import type { NodeOperationError } from './errors/node-operation.error';
import type { NodeApiError } from './errors/node-api.error';
import type { AxiosProxyConfig } from 'axios';
@ -80,6 +81,7 @@ export type ExecutionError =
| ExpressionError
| WorkflowActivationError
| WorkflowOperationError
| ExecutionCancelledError
| NodeOperationError
| NodeApiError;

View file

@ -0,0 +1,10 @@
import { ExecutionBaseError } from './abstract/execution-base.error';
export class ExecutionCancelledError extends ExecutionBaseError {
constructor(executionId: string) {
super('The execution was cancelled', {
level: 'warning',
extra: { executionId },
});
}
}

View file

@ -1,6 +1,7 @@
export { ApplicationError } from './application.error';
export { ExpressionError } from './expression.error';
export { CredentialAccessError } from './credential-access-error';
export { ExecutionCancelledError } from './execution-cancelled.error';
export { NodeApiError } from './node-api.error';
export { NodeOperationError } from './node-operation.error';
export { NodeSslError } from './node-ssl.error';