fix(core): Fix execution cancellation in scaling mode (#9841)

This commit is contained in:
Iván Ovejero 2024-06-28 20:05:09 +02:00 committed by GitHub
parent 10f7d4b5b9
commit e613de28ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 417 additions and 143 deletions

View file

@ -36,6 +36,10 @@ export class ActiveExecutions {
private readonly concurrencyControl: ConcurrencyControlService,
) {}
has(executionId: string) {
return this.activeExecutions[executionId] !== undefined;
}
/**
* Add a new active execution
*/

View file

@ -58,6 +58,12 @@ export class Queue {
});
}
async findRunningJobBy({ executionId }: { executionId: string }) {
const activeOrWaitingJobs = await this.getJobs(['active', 'waiting']);
return activeOrWaitingJobs.find(({ data }) => data.executionId === executionId) ?? null;
}
decodeWebhookResponse(response: IExecuteResponsePromiseData): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&

View file

@ -1,10 +1,6 @@
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
WorkflowOperationError,
} from 'n8n-workflow';
import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import { Service } from 'typedi';
import type { ExecutionStopResult, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { OwnershipService } from '@/services/ownership.service';
@ -30,6 +26,10 @@ export class WaitTracker {
private readonly orchestrationService: OrchestrationService,
) {}
has(executionId: string) {
return this.waitingExecutions[executionId] !== undefined;
}
/**
* @important Requires `OrchestrationService` to be initialized.
*/
@ -101,53 +101,12 @@ export class WaitTracker {
}
}
async stopExecution(executionId: string): Promise<ExecutionStopResult> {
if (this.waitingExecutions[executionId] !== undefined) {
// The waiting execution was already scheduled to execute.
// So stop timer and remove.
clearTimeout(this.waitingExecutions[executionId].timer);
delete this.waitingExecutions[executionId];
}
async stopExecution(executionId: string) {
if (!this.waitingExecutions[executionId]) return;
// Also check in database
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
clearTimeout(this.waitingExecutions[executionId].timer);
if (!fullExecutionData) {
throw new ApplicationError('Execution not found.', {
extra: { executionId },
});
}
if (!['new', 'unknown', 'waiting', 'running'].includes(fullExecutionData.status)) {
throw new WorkflowOperationError(
`Only running or waiting executions can be stopped and ${executionId} is currently ${fullExecutionData.status}.`,
);
}
// Set in execution in DB as failed and remove waitTill time
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
fullExecutionData.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};
fullExecutionData.stoppedAt = new Date();
fullExecutionData.waitTill = null;
fullExecutionData.status = 'canceled';
await this.executionRepository.updateExistingExecution(executionId, fullExecutionData);
return {
mode: fullExecutionData.mode,
startedAt: new Date(fullExecutionData.startedAt),
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
finished: fullExecutionData.finished,
status: fullExecutionData.status,
};
delete this.waitingExecutions[executionId];
}
startExecution(executionId: string) {

View file

@ -69,6 +69,15 @@ export class ConcurrencyControlService {
});
}
/**
* Check whether an execution is in the production queue.
*/
has(executionId: string) {
if (!this.isEnabled) return false;
return this.productionQueue.getAll().has(executionId);
}
/**
* Block or let through an execution based on concurrency capacity.
*/

View file

@ -22,6 +22,7 @@ import type {
import { parse, stringify } from 'flatted';
import {
ApplicationError,
WorkflowOperationError,
type ExecutionStatus,
type ExecutionSummary,
type IRunExecutionData,
@ -609,8 +610,34 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
}
async cancel(executionId: string) {
await this.update({ id: executionId }, { status: 'canceled', stoppedAt: new Date() });
async stopBeforeRun(execution: IExecutionResponse) {
execution.status = 'canceled';
execution.stoppedAt = new Date();
await this.update(
{ id: execution.id },
{ status: execution.status, stoppedAt: execution.stoppedAt },
);
return execution;
}
async stopDuringRun(execution: IExecutionResponse) {
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
execution.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};
execution.stoppedAt = new Date();
execution.waitTill = null;
execution.status = 'canceled';
await this.updateExistingExecution(execution.id, execution);
return execution;
}
async cancelMany(executionIds: string[]) {

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class MissingExecutionStopError extends ApplicationError {
constructor(executionId: string) {
super('Failed to find execution to stop', { extra: { executionId } });
}
}

View file

@ -0,0 +1,269 @@
import { mock } from 'jest-mock-extended';
import { WorkflowOperationError } from 'n8n-workflow';
import config from '@/config';
import { ExecutionService } from '@/executions/execution.service';
import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error';
import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error';
import type { ActiveExecutions } from '@/ActiveExecutions';
import type { IExecutionResponse } from '@/Interfaces';
import type { Job, Queue } from '@/Queue';
import type { WaitTracker } from '@/WaitTracker';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { ExecutionRequest } from '@/executions/execution.types';
import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
describe('ExecutionService', () => {
const queue = mock<Queue>();
const activeExecutions = mock<ActiveExecutions>();
const executionRepository = mock<ExecutionRepository>();
const waitTracker = mock<WaitTracker>();
const concurrencyControl = mock<ConcurrencyControlService>();
const executionService = new ExecutionService(
mock(),
queue,
activeExecutions,
executionRepository,
mock(),
mock(),
waitTracker,
mock(),
concurrencyControl,
mock(),
);
beforeEach(() => {
config.set('executions.mode', 'regular');
jest.clearAllMocks();
});
describe('retry', () => {
it('should error on retrying a execution that was aborted before starting', async () => {
/**
* Arrange
*/
executionRepository.findWithUnflattenedData.mockResolvedValue(
mock<IExecutionResponse>({ data: { executionData: undefined } }),
);
const req = mock<ExecutionRequest.Retry>();
/**
* Act
*/
const retry = executionService.retry(req, []);
/**
* Assert
*/
await expect(retry).rejects.toThrow(AbortedExecutionRetryError);
});
});
describe('stop', () => {
it('should throw when stopping a missing execution', async () => {
/**
* Arrange
*/
executionRepository.findSingleExecution.mockResolvedValue(undefined);
/**
* Act
*/
const stop = executionService.stop('inexistent-123');
/**
* Assert
*/
await expect(stop).rejects.toThrowError(MissingExecutionStopError);
});
it('should throw when stopping a not-in-progress execution', async () => {
/**
* Arrange
*/
const execution = mock<IExecutionResponse>({ id: '123', status: 'success' });
executionRepository.findSingleExecution.mockResolvedValue(execution);
/**
* Act
*/
const stop = executionService.stop(execution.id);
/**
* Assert
*/
await expect(stop).rejects.toThrowError(WorkflowOperationError);
});
describe('regular mode', () => {
it('should stop a `running` execution in regular mode', async () => {
/**
* Arrange
*/
const execution = mock<IExecutionResponse>({ id: '123', status: 'running' });
executionRepository.findSingleExecution.mockResolvedValue(execution);
concurrencyControl.has.mockReturnValue(false);
activeExecutions.has.mockReturnValue(true);
waitTracker.has.mockReturnValue(false);
executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>());
/**
* Act
*/
await executionService.stop(execution.id);
/**
* Assert
*/
expect(concurrencyControl.remove).not.toHaveBeenCalled();
expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id);
expect(waitTracker.stopExecution).not.toHaveBeenCalled();
expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution);
});
it('should stop a `waiting` execution in regular mode', async () => {
/**
* Arrange
*/
const execution = mock<IExecutionResponse>({ id: '123', status: 'waiting' });
executionRepository.findSingleExecution.mockResolvedValue(execution);
concurrencyControl.has.mockReturnValue(false);
activeExecutions.has.mockReturnValue(true);
waitTracker.has.mockReturnValue(true);
executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>());
/**
* Act
*/
await executionService.stop(execution.id);
/**
* Assert
*/
expect(concurrencyControl.remove).not.toHaveBeenCalled();
expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id);
expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id);
expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution);
});
it('should stop a concurrency-controlled `new` execution in regular mode', async () => {
/**
* Arrange
*/
const execution = mock<IExecutionResponse>({ id: '123', status: 'new', mode: 'trigger' });
executionRepository.findSingleExecution.mockResolvedValue(execution);
concurrencyControl.has.mockReturnValue(true);
activeExecutions.has.mockReturnValue(false);
waitTracker.has.mockReturnValue(false);
executionRepository.stopBeforeRun.mockResolvedValue(mock<IExecutionResponse>());
/**
* Act
*/
await executionService.stop(execution.id);
/**
* Assert
*/
expect(concurrencyControl.remove).toHaveBeenCalledWith({
mode: execution.mode,
executionId: execution.id,
});
expect(activeExecutions.stopExecution).not.toHaveBeenCalled();
expect(waitTracker.stopExecution).not.toHaveBeenCalled();
expect(executionRepository.stopDuringRun).not.toHaveBeenCalled();
});
});
describe('scaling mode', () => {
describe('manual execution', () => {
it('should delegate to regular mode in scaling mode', async () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
const execution = mock<IExecutionResponse>({
id: '123',
mode: 'manual',
status: 'running',
});
executionRepository.findSingleExecution.mockResolvedValue(execution);
concurrencyControl.has.mockReturnValue(false);
activeExecutions.has.mockReturnValue(true);
waitTracker.has.mockReturnValue(false);
executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>());
// @ts-expect-error Private method
const stopInRegularModeSpy = jest.spyOn(executionService, 'stopInRegularMode');
/**
* Act
*/
await executionService.stop(execution.id);
/**
* Assert
*/
expect(stopInRegularModeSpy).toHaveBeenCalledWith(execution);
expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id);
expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution);
expect(concurrencyControl.remove).not.toHaveBeenCalled();
expect(waitTracker.stopExecution).not.toHaveBeenCalled();
expect(queue.stopJob).not.toHaveBeenCalled();
});
});
describe('production execution', () => {
it('should stop a `running` execution in scaling mode', async () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
const execution = mock<IExecutionResponse>({ id: '123', status: 'running' });
executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.has.mockReturnValue(false);
queue.findRunningJobBy.mockResolvedValue(mock<Job>());
executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>());
/**
* Act
*/
await executionService.stop(execution.id);
/**
* Assert
*/
expect(waitTracker.stopExecution).not.toHaveBeenCalled();
expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id });
expect(queue.stopJob).toHaveBeenCalled();
expect(executionRepository.stopDuringRun).toHaveBeenCalled();
});
it('should stop a `waiting` execution in scaling mode', async () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
const execution = mock<IExecutionResponse>({ id: '123', status: 'waiting' });
executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.has.mockReturnValue(true);
queue.findRunningJobBy.mockResolvedValue(mock<Job>());
executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>());
/**
* Act
*/
await executionService.stop(execution.id);
/**
* Assert
*/
expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id);
expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id });
expect(queue.stopJob).toHaveBeenCalled();
expect(executionRepository.stopDuringRun).toHaveBeenCalled();
});
});
});
});
});

View file

@ -24,7 +24,7 @@ import type {
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { Queue } from '@/Queue';
import type { ExecutionRequest, ExecutionSummaries } from './execution.types';
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository';
@ -34,7 +34,7 @@ import { InternalServerError } from '@/errors/response-errors/internal-server.er
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import config from '@/config';
import { WaitTracker } from '@/WaitTracker';
import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error';
import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error';
@ -328,8 +328,6 @@ export class ExecutionService {
// new API
// ----------------------------------
private readonly isRegularMode = config.getEnv('executions.mode') === 'regular';
/**
* Find summaries of executions that satisfy a query.
*
@ -392,59 +390,6 @@ export class ExecutionService {
};
}
/**
* Stop an active execution.
*/
async stop(executionId: string) {
const execution = await this.executionRepository.findOneBy({ id: executionId });
if (!execution) throw new NotFoundError('Execution not found');
if (execution.status === 'new') {
this.concurrencyControl.remove({ mode: execution.mode, executionId });
await this.executionRepository.cancel(executionId);
return;
}
const stopResult = await this.activeExecutions.stopExecution(execution.id);
if (stopResult) return this.toExecutionStopResult(execution);
if (this.isRegularMode) {
return await this.waitTracker.stopExecution(execution.id);
}
// queue mode
try {
return await this.waitTracker.stopExecution(execution.id);
} catch {
// @TODO: Why are we swallowing this error in queue mode?
}
const activeJobs = await this.queue.getJobs(['active', 'waiting']);
const job = activeJobs.find(({ data }) => data.executionId === execution.id);
if (job) {
await this.queue.stopJob(job);
} else {
this.logger.debug('Job to stop no longer in queue', { jobId: execution.id });
}
return this.toExecutionStopResult(execution);
}
private toExecutionStopResult(execution: ExecutionEntity) {
return {
mode: execution.mode,
startedAt: new Date(execution.startedAt),
stoppedAt: execution.stoppedAt ? new Date(execution.stoppedAt) : undefined,
finished: execution.finished,
status: execution.status,
};
}
async findAllEnqueuedExecutions() {
return await this.executionRepository.findMultipleExecutions(
{
@ -455,4 +400,76 @@ export class ExecutionService {
{ includeData: true, unflattenData: true },
);
}
async stop(executionId: string): Promise<StopResult> {
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!execution) throw new MissingExecutionStopError(executionId);
this.assertStoppable(execution);
const { mode, startedAt, stoppedAt, finished, status } =
config.getEnv('executions.mode') === 'regular'
? await this.stopInRegularMode(execution)
: await this.stopInScalingMode(execution);
return {
mode,
startedAt: new Date(startedAt),
stoppedAt: stoppedAt ? new Date(stoppedAt) : undefined,
finished,
status,
};
}
private assertStoppable(execution: IExecutionResponse) {
const STOPPABLE_STATUSES: ExecutionStatus[] = ['new', 'unknown', 'waiting', 'running'];
if (!STOPPABLE_STATUSES.includes(execution.status)) {
throw new WorkflowOperationError(
`Only running or waiting executions can be stopped and ${execution.id} is currently ${execution.status}`,
);
}
}
private async stopInRegularMode(execution: IExecutionResponse) {
if (this.concurrencyControl.has(execution.id)) {
this.concurrencyControl.remove({ mode: execution.mode, executionId: execution.id });
return await this.executionRepository.stopBeforeRun(execution);
}
if (this.activeExecutions.has(execution.id)) {
await this.activeExecutions.stopExecution(execution.id);
}
if (this.waitTracker.has(execution.id)) {
await this.waitTracker.stopExecution(execution.id);
}
return await this.executionRepository.stopDuringRun(execution);
}
private async stopInScalingMode(execution: IExecutionResponse) {
if (execution.mode === 'manual') {
// manual executions in scaling mode are processed by main
return await this.stopInRegularMode(execution);
}
if (this.waitTracker.has(execution.id)) {
await this.waitTracker.stopExecution(execution.id);
}
const job = await this.queue.findRunningJobBy({ executionId: execution.id });
if (job) {
await this.queue.stopJob(job);
} else {
this.logger.debug('Job to stop not in queue', { executionId: execution.id });
}
return await this.executionRepository.stopDuringRun(execution);
}
}

View file

@ -1,6 +1,6 @@
import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
import type { AuthenticatedRequest } from '@/requests';
import type { ExecutionStatus, IDataObject } from 'n8n-workflow';
import type { ExecutionStatus, IDataObject, WorkflowExecuteMode } from 'n8n-workflow';
export declare namespace ExecutionRequest {
namespace QueryParams {
@ -101,3 +101,11 @@ export type QueueRecoverySettings = {
*/
waitMs: number;
};
export type StopResult = {
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt?: Date;
finished: boolean;
status: ExecutionStatus;
};

View file

@ -1,32 +0,0 @@
import type { IExecutionResponse } from '@/Interfaces';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error';
import { ExecutionService } from '@/executions/execution.service';
import type { ExecutionRequest } from '@/executions/execution.types';
import { mock } from 'jest-mock-extended';
describe('ExecutionService', () => {
const executionRepository = mock<ExecutionRepository>();
const executionService = new ExecutionService(
mock(),
mock(),
mock(),
executionRepository,
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
);
it('should error on retrying an aborted execution', async () => {
const abortedExecutionData = mock<IExecutionResponse>({ data: { executionData: undefined } });
executionRepository.findWithUnflattenedData.mockResolvedValue(abortedExecutionData);
const req = mock<ExecutionRequest.Retry>();
const retry = executionService.retry(req, []);
await expect(retry).rejects.toThrow(AbortedExecutionRetryError);
});
});