perf(core): Introduce concurrency control for main mode (#9453)

This commit is contained in:
Iván Ovejero 2024-06-12 15:05:43 +02:00 committed by GitHub
parent 6c1a4c8ebf
commit 797342343f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 919 additions and 57 deletions

View file

@ -18,6 +18,8 @@ import type {
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ConcurrencyControlService } from './concurrency/concurrency-control.service';
import config from './config';
@Service()
export class ActiveExecutions {
@ -31,6 +33,7 @@ export class ActiveExecutions {
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly concurrencyControl: ConcurrencyControlService,
) {}
/**
@ -38,12 +41,13 @@ export class ActiveExecutions {
*/
async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise<string> {
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
const mode = executionData.executionMode;
if (executionId === undefined) {
// Is a new execution so save in DB
const fullExecutionData: ExecutionPayload = {
data: executionData.executionData!,
mode: executionData.executionMode,
mode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
@ -64,10 +68,14 @@ export class ActiveExecutions {
if (executionId === undefined) {
throw new ApplicationError('There was an issue assigning an execution id to the execution');
}
await this.concurrencyControl.throttle({ mode, executionId });
executionStatus = 'running';
} else {
// Is an existing execution we want to finish so update in DB
await this.concurrencyControl.throttle({ mode, executionId });
const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = {
id: executionId,
data: executionData.executionData!,
@ -128,6 +136,8 @@ export class ActiveExecutions {
// Remove from the list of active executions
delete this.activeExecutions[executionId];
this.concurrencyControl.release({ mode: execution.executionData.executionMode });
}
/**
@ -191,6 +201,10 @@ export class ActiveExecutions {
let executionIds = Object.keys(this.activeExecutions);
if (cancelAll) {
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.removeAll(this.activeExecutions);
}
const stopPromises = executionIds.map(
async (executionId) => await this.stopExecution(executionId),
);

View file

@ -1261,4 +1261,8 @@ export class InternalHooks {
}) {
return await this.telemetry.track('Project settings updated', data);
}
async onConcurrencyLimitHit({ threshold }: { threshold: number }) {
await this.telemetry.track('User hit concurrency limit', { threshold });
}
}

View file

@ -9,6 +9,7 @@ import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
export = {
deleteExecution: [
@ -32,6 +33,19 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}
if (execution.status === 'running') {
return res.status(400).json({
message: 'Cannot delete a running execution',
});
}
if (execution.status === 'new') {
Container.get(ConcurrencyControlService).remove({
executionId: execution.id,
mode: execution.mode,
});
}
await Container.get(ExecutionRepository).hardDelete({
workflowId: execution.workflowId,
executionId: execution.id,

View file

@ -27,6 +27,10 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/WaitTracker';
import { BaseCommand } from './BaseCommand';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
@ -288,6 +292,10 @@ export class Start extends BaseCommand {
await this.initPruning();
if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
}
// Start to get active workflows and run their triggers
await this.activeWorkflowManager.init();
@ -347,4 +355,38 @@ export class Start extends BaseCommand {
if (error.stack) this.logger.error(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);
}
/**
* During startup, we may find executions that had been enqueued at the time of shutdown.
*
* If so, start running any such executions concurrently up to the concurrency limit, and
* enqueue any remaining ones until we have spare concurrency capacity again.
*/
private async runEnqueuedExecutions() {
const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions();
if (executions.length === 0) return;
this.logger.debug(
'[Startup] Found enqueued executions to run',
executions.map((e) => e.id),
);
const ownershipService = Container.get(OwnershipService);
const workflowRunner = Container.get(WorkflowRunner);
for (const execution of executions) {
const project = await ownershipService.getWorkflowProjectCached(execution.workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
};
// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
}
}

View file

@ -317,8 +317,12 @@ export class Worker extends BaseCommand {
Worker.jobQueue = Container.get(Queue);
await Worker.jobQueue.init();
this.logger.debug('Queue singleton ready');
const envConcurrency = config.getEnv('executions.concurrency.productionLimit');
const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency;
void Worker.jobQueue.process(
flags.concurrency,
concurrency,
async (job) => await this.runJob(job, this.nodeTypes),
);

View file

@ -0,0 +1,369 @@
import { mock } from 'jest-mock-extended';
import config from '@/config';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import type { Logger } from '@/Logger';
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
import { ConcurrencyQueue } from '../concurrency-queue';
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutingWorkflowData } from '@/Interfaces';
import type { Telemetry } from '@/telemetry';
describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
afterEach(() => {
config.set('executions.concurrency.productionLimit', -1);
config.set('executions.mode', 'integrated');
jest.clearAllMocks();
});
describe('constructor', () => {
it('should be enabled if production cap is positive', () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
/**
* Assert
*/
// @ts-expect-error Private property
expect(service.isEnabled).toBe(true);
// @ts-expect-error Private property
expect(service.productionQueue).toBeDefined();
});
it('should throw if production cap is 0', () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 0);
try {
/**
* Act
*/
new ConcurrencyControlService(logger, executionRepository, telemetry);
} catch (error) {
/**
* Assert
*/
expect(error).toBeInstanceOf(InvalidConcurrencyLimitError);
}
});
it('should be disabled if production cap is -1', () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', -1);
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
/**
* Assert
*/
// @ts-expect-error Private property
expect(service.isEnabled).toBe(false);
});
it('should be disabled if production cap is lower than -1', () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', -2);
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
/**
* Act
*/
// @ts-expect-error Private property
expect(service.isEnabled).toBe(false);
});
it('should be disabled on queue mode', () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
config.set('executions.concurrency.productionLimit', 2);
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
/**
* Assert
*/
// @ts-expect-error Private property
expect(service.isEnabled).toBe(false);
});
});
// ----------------------------------
// enabled
// ----------------------------------
describe('if enabled', () => {
describe('throttle', () => {
it.each(['cli', 'error', 'integrated', 'internal', 'manual', 'retry'])(
'should do nothing on %s mode',
async (mode: ExecutionMode) => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
* Act
*/
await service.throttle({ mode, executionId: '1' });
/**
* Assert
*/
expect(enqueueSpy).not.toHaveBeenCalled();
},
);
it.each(['webhook', 'trigger'])('should enqueue on %s mode', async (mode: ExecutionMode) => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
* Act
*/
await service.throttle({ mode, executionId: '1' });
/**
* Assert
*/
expect(enqueueSpy).toHaveBeenCalled();
});
});
describe('release', () => {
it.each(['cli', 'error', 'integrated', 'internal', 'manual', 'retry'])(
'should do nothing on %s mode',
async (mode: ExecutionMode) => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
* Act
*/
await service.throttle({ mode, executionId: '1' });
/**
* Assert
*/
expect(dequeueSpy).not.toHaveBeenCalled();
},
);
it.each(['webhook', 'trigger'])('should dequeue on %s mode', (mode: ExecutionMode) => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
* Act
*/
service.release({ mode });
/**
* Assert
*/
expect(dequeueSpy).toHaveBeenCalled();
});
});
describe('remove', () => {
it.each(['cli', 'error', 'integrated', 'internal', 'manual', 'retry'])(
'should do nothing on %s mode',
async (mode: ExecutionMode) => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
* Act
*/
await service.throttle({ mode, executionId: '1' });
/**
* Assert
*/
expect(removeSpy).not.toHaveBeenCalled();
},
);
it.each(['webhook', 'trigger'])(
'should remove an execution on %s mode',
(mode: ExecutionMode) => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
* Act
*/
service.remove({ mode, executionId: '1' });
/**
* Assert
*/
expect(removeSpy).toHaveBeenCalled();
},
);
});
describe('removeAll', () => {
it('should remove all executions from the production queue', async () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 2);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
jest
.spyOn(ConcurrencyQueue.prototype, 'getAll')
.mockReturnValueOnce(new Set(['1', '2', '3']));
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
* Act
*/
await service.removeAll({
'1': mock<IExecutingWorkflowData>(),
'2': mock<IExecutingWorkflowData>(),
'3': mock<IExecutingWorkflowData>(),
});
/**
* Assert
*/
expect(removeSpy).toHaveBeenNthCalledWith(1, '1');
expect(removeSpy).toHaveBeenNthCalledWith(2, '2');
expect(removeSpy).toHaveBeenNthCalledWith(3, '3');
});
});
});
// ----------------------------------
// disabled
// ----------------------------------
describe('if disabled', () => {
describe('throttle', () => {
it('should do nothing', async () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', -1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
* Act
*/
await service.throttle({ mode: 'trigger', executionId: '1' });
await service.throttle({ mode: 'webhook', executionId: '2' });
/**
* Assert
*/
expect(enqueueSpy).not.toHaveBeenCalled();
});
});
describe('release', () => {
it('should do nothing', () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', -1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
* Act
*/
service.release({ mode: 'webhook' });
/**
* Assert
*/
expect(dequeueSpy).not.toHaveBeenCalled();
});
});
describe('remove', () => {
it('should do nothing', () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', -1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
* Act
*/
service.remove({ mode: 'webhook', executionId: '1' });
/**
* Assert
*/
expect(removeSpy).not.toHaveBeenCalled();
});
});
});
});

View file

@ -0,0 +1,61 @@
import { ConcurrencyQueue } from '../concurrency-queue';
describe('ConcurrencyQueue', () => {
beforeAll(() => {
jest.useFakeTimers();
});
it('should limit concurrency', async () => {
const queue = new ConcurrencyQueue(1);
const state: Record<string, 'started' | 'finished'> = {};
// eslint-disable-next-line @typescript-eslint/promise-function-async
const sleep = jest.fn(() => new Promise((resolve) => setTimeout(resolve, 500)));
const testFn = async (item: { executionId: string }) => {
await queue.enqueue(item.executionId);
state[item.executionId] = 'started';
await sleep();
queue.dequeue();
state[item.executionId] = 'finished';
};
void Promise.all([
testFn({ executionId: '1' }),
testFn({ executionId: '2' }),
testFn({ executionId: '3' }),
testFn({ executionId: '4' }),
testFn({ executionId: '5' }),
]);
// At T+0 seconds this method hasn't yielded to the event-loop, so no `testFn` calls are made
expect(sleep).toHaveBeenCalledTimes(0);
expect(state).toEqual({});
// At T+0.4 seconds the first `testFn` has been called, but hasn't resolved
await jest.advanceTimersByTimeAsync(400);
expect(sleep).toHaveBeenCalledTimes(1);
expect(state).toEqual({ 1: 'started' });
// At T+0.5 seconds the first promise has resolved, and the second one has stared
await jest.advanceTimersByTimeAsync(100);
expect(sleep).toHaveBeenCalledTimes(2);
expect(state).toEqual({ 1: 'finished', 2: 'started' });
// At T+1 seconds the first two promises have resolved, and the third one has stared
await jest.advanceTimersByTimeAsync(500);
expect(sleep).toHaveBeenCalledTimes(3);
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started' });
// If the fourth promise is removed, the fifth one is started in the next tick
queue.remove('4');
await jest.advanceTimersByTimeAsync(1);
expect(sleep).toHaveBeenCalledTimes(4);
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started', 5: 'started' });
// at T+5 seconds, all but the fourth promise should be resolved
await jest.advanceTimersByTimeAsync(4000);
expect(sleep).toHaveBeenCalledTimes(4);
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'finished', 5: 'finished' });
});
});

View file

@ -0,0 +1,160 @@
import { Logger } from '@/Logger';
import config from '@/config';
import { Service } from 'typedi';
import { ConcurrencyQueue } from './concurrency-queue';
import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error';
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { IExecutingWorkflowData } from '@/Interfaces';
import { Telemetry } from '@/telemetry';
@Service()
export class ConcurrencyControlService {
private readonly isEnabled: boolean;
private readonly productionLimit: number;
private readonly productionQueue: ConcurrencyQueue;
private readonly limitsToReport = [5, 10, 20, 50, 100, 200];
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly telemetry: Telemetry,
) {
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
if (this.productionLimit === 0) {
throw new InvalidConcurrencyLimitError(this.productionLimit);
}
if (this.productionLimit < -1) {
this.productionLimit = -1;
}
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
this.isEnabled = false;
this.log('Service disabled');
return;
}
this.productionQueue = new ConcurrencyQueue(this.productionLimit);
this.logInit();
this.isEnabled = true;
this.productionQueue.on(
'execution-throttled',
async ({ executionId, capacity }: { executionId: string; capacity: number }) => {
this.log('Execution throttled', { executionId });
/**
* Temporary until base data for cloud plans is collected.
*/
if (this.shouldReport(capacity)) {
await this.telemetry.track('User hit concurrency limit', { threshold: capacity });
}
},
);
this.productionQueue.on('execution-released', async (executionId: string) => {
this.log('Execution released', { executionId });
await this.executionRepository.resetStartedAt(executionId);
});
}
/**
* Block or let through an execution based on concurrency capacity.
*/
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return;
await this.productionQueue.enqueue(executionId);
}
/**
* Release capacity back so the next execution in the production queue can proceed.
*/
release({ mode }: { mode: ExecutionMode }) {
if (!this.isEnabled || this.isUnlimited(mode)) return;
this.productionQueue.dequeue();
}
/**
* Remove an execution from the production queue, releasing capacity back.
*/
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return;
this.productionQueue.remove(executionId);
}
/**
* Empty the production queue, releasing all capacity back. Also cancel any
* enqueued executions that have response promises, as these cannot
* be re-run via `Start.runEnqueuedExecutions` during startup.
*/
async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) {
if (!this.isEnabled) return;
const enqueuedProductionIds = this.productionQueue.getAll();
for (const id of enqueuedProductionIds) {
this.productionQueue.remove(id);
}
const executionIds = Object.entries(activeExecutions)
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
.map(([executionId, _]) => executionId);
if (executionIds.length === 0) return;
await this.executionRepository.cancelMany(executionIds);
this.logger.info('Canceled enqueued executions with response promises', { executionIds });
}
// ----------------------------------
// private
// ----------------------------------
private logInit() {
this.log('Enabled');
this.log(
[
'Production execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
].join(' '),
);
}
private isUnlimited(mode: ExecutionMode) {
if (
mode === 'error' ||
mode === 'integrated' ||
mode === 'cli' ||
mode === 'internal' ||
mode === 'manual' ||
mode === 'retry'
) {
return true;
}
if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1;
throw new UnknownExecutionModeError(mode);
}
private log(message: string, meta?: object) {
this.logger.debug(['[Concurrency Control]', message].join(' '), meta);
}
private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}
}

View file

@ -0,0 +1,59 @@
import { Service } from 'typedi';
import { EventEmitter } from 'node:events';
@Service()
export class ConcurrencyQueue extends EventEmitter {
private readonly queue: Array<{
executionId: string;
resolve: () => void;
}> = [];
constructor(private capacity: number) {
super();
}
async enqueue(executionId: string) {
this.capacity--;
if (this.capacity < 0) {
this.emit('execution-throttled', { executionId, capacity: this.capacity });
// eslint-disable-next-line @typescript-eslint/return-await
return new Promise<void>((resolve) => this.queue.push({ executionId, resolve }));
}
}
dequeue() {
this.capacity++;
this.resolveNext();
}
remove(executionId: string) {
const index = this.queue.findIndex((item) => item.executionId === executionId);
if (index > -1) {
this.queue.splice(index, 1);
this.capacity++;
this.resolveNext();
}
}
getAll() {
return new Set(...this.queue.map((item) => item.executionId));
}
private resolveNext() {
const item = this.queue.shift();
if (!item) return;
const { resolve, executionId } = item;
this.emit('execution-released', executionId);
resolve();
}
}

View file

@ -254,6 +254,15 @@ export const schema = {
env: 'EXECUTIONS_MODE',
},
concurrency: {
productionLimit: {
doc: "Max production executions allowed to run concurrently, in main process for regular mode and in worker for queue mode. Default for main mode is `-1` (disabled). Default for queue mode is taken from the worker's `--concurrency` flag.",
format: Number,
default: -1,
env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT',
},
},
// A Workflow times out and gets canceled after this time (seconds).
// If the workflow is executed in the main process a soft timeout
// is executed (takes effect after the current node finishes).

View file

@ -285,6 +285,10 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await this.update({ id: executionId }, { status });
}
async resetStartedAt(executionId: string) {
await this.update({ id: executionId }, { startedAt: new Date() });
}
async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
// Se isolate startedAt because it must be set when the execution starts and should never change.
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
@ -597,6 +601,14 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
}
async cancel(executionId: string) {
await this.update({ id: executionId }, { status: 'canceled', stoppedAt: new Date() });
}
async cancelMany(executionIds: string[]) {
await this.update({ id: In(executionIds) }, { status: 'canceled', stoppedAt: new Date() });
}
// ----------------------------------
// new API
// ----------------------------------
@ -717,6 +729,8 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
if (query.order?.stoppedAt === 'DESC') {
qb.orderBy({ 'execution.stoppedAt': 'DESC' });
} else if (query.order?.top) {
qb.orderBy(`(CASE WHEN execution.status = '${query.order.top}' THEN 0 ELSE 1 END)`);
} else {
qb.orderBy({ 'execution.id': 'DESC' });
}

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class InvalidConcurrencyLimitError extends ApplicationError {
constructor(value: number) {
super('Concurrency limit set to invalid value', { level: 'warning', extra: { value } });
}
}

View file

@ -0,0 +1,9 @@
import { ApplicationError } from 'n8n-workflow';
export class QueuedExecutionRetryError extends ApplicationError {
constructor() {
super('Execution is queued to run (not yet started) so it cannot be retried', {
level: 'warning',
});
}
}

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class UnknownExecutionModeError extends ApplicationError {
constructor(mode: string) {
super('Unknown execution mode', { extra: { mode } });
}
}

View file

@ -140,7 +140,7 @@ export class MessageEventBus extends EventEmitter {
const dbUnfinishedExecutionIds = (
await this.executionRepository.find({
where: {
status: In(['running', 'new', 'unknown']),
status: In(['running', 'unknown']),
},
select: ['id'],
})

View file

@ -37,6 +37,8 @@ import { NotFoundError } from '@/errors/response-errors/not-found.error';
import config from '@/config';
import { WaitTracker } from '@/WaitTracker';
import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error';
export const schemaGetExecutionsQueryFilter = {
@ -87,6 +89,7 @@ export class ExecutionService {
private readonly nodeTypes: NodeTypes,
private readonly waitTracker: WaitTracker,
private readonly workflowRunner: WorkflowRunner,
private readonly concurrencyControl: ConcurrencyControlService,
) {}
async findOne(
@ -133,6 +136,8 @@ export class ExecutionService {
throw new NotFoundError(`The execution with the ID "${executionId}" does not exist.`);
}
if (execution.status === 'new') throw new QueuedExecutionRetryError();
if (!execution.data.executionData) throw new AbortedExecutionRetryError();
if (execution.finished) {
@ -244,14 +249,10 @@ export class ExecutionService {
}
}
return await this.executionRepository.deleteExecutionsByFilter(
requestFilters,
sharedWorkflowIds,
{
deleteBefore,
ids,
},
);
await this.executionRepository.deleteExecutionsByFilter(requestFilters, sharedWorkflowIds, {
deleteBefore,
ids,
});
}
async createErrorExecution(
@ -359,31 +360,37 @@ export class ExecutionService {
}
/**
* Find summaries of active and finished executions that satisfy a query.
* Return:
*
* Return also the total count of all finished executions that satisfy the query,
* and whether the total is an estimate or not. Active executions are excluded
* from the total and count for pagination purposes.
* - the latest summaries of current and completed executions that satisfy a query,
* - the total count of all completed executions that satisfy the query, and
* - whether the total of completed executions is an estimate.
*
* By default, "current" means executions starting and running. With concurrency
* control, "current" means executions enqueued to start and running.
*/
async findAllRunningAndLatest(query: ExecutionSummaries.RangeQuery) {
const currentlyRunningStatuses: ExecutionStatus[] = ['new', 'running'];
const allStatuses = new Set(ExecutionStatusList);
currentlyRunningStatuses.forEach((status) => allStatuses.delete(status));
const notRunningStatuses: ExecutionStatus[] = Array.from(allStatuses);
async findLatestCurrentAndCompleted(query: ExecutionSummaries.RangeQuery) {
const currentStatuses: ExecutionStatus[] = ['new', 'running'];
const [activeResult, finishedResult] = await Promise.all([
this.findRangeWithCount({ ...query, status: currentlyRunningStatuses }),
const completedStatuses = ExecutionStatusList.filter((s) => !currentStatuses.includes(s));
const [current, completed] = await Promise.all([
this.findRangeWithCount({
...query,
status: notRunningStatuses,
status: currentStatuses,
order: { top: 'running' }, // ensure limit cannot exclude running
}),
this.findRangeWithCount({
...query,
status: completedStatuses,
order: { stoppedAt: 'DESC' },
}),
]);
return {
results: activeResult.results.concat(finishedResult.results),
count: finishedResult.count,
estimated: finishedResult.estimated,
results: current.results.concat(completed.results),
count: completed.count, // exclude current from count for pagination
estimated: completed.estimated,
};
}
@ -395,6 +402,13 @@ export class ExecutionService {
if (!execution) throw new NotFoundError('Execution not found');
if (execution.status === 'new') {
this.concurrencyControl.remove({ mode: execution.mode, executionId });
await this.executionRepository.cancel(executionId);
return;
}
const stopResult = await this.activeExecutions.stopExecution(execution.id);
if (stopResult) return this.toExecutionStopResult(execution);
@ -432,4 +446,15 @@ export class ExecutionService {
status: execution.status,
};
}
async findAllEnqueuedExecutions() {
return await this.executionRepository.findMultipleExecutions(
{
select: ['id', 'mode'],
where: { status: 'new' },
order: { id: 'ASC' },
},
{ includeData: true, unflattenData: true },
);
}
}

View file

@ -79,7 +79,8 @@ export namespace ExecutionSummaries {
type OrderFields = {
order?: {
stoppedAt: 'DESC';
top?: ExecutionStatus;
stoppedAt?: 'DESC';
};
};
}

View file

@ -53,7 +53,7 @@ export class ExecutionsController {
const noRange = !query.range.lastId || !query.range.firstId;
if (noStatus && noRange) {
return await this.executionService.findAllRunningAndLatest(query);
return await this.executionService.findLatestCurrentAndCompleted(query);
}
return await this.executionService.findRangeWithCount(query);

View file

@ -21,6 +21,9 @@ import { getLdapSynchronizations, saveLdapSynchronization } from '@/Ldap/helpers
import { createLdapConfig } from '../../shared/ldap';
import { LdapService } from '@/Ldap/ldap.service';
import { v4 as uuid } from 'uuid';
import { Telemetry } from '@/telemetry';
mockInstance(Telemetry);
const oclifConfig = new Config({ root: __dirname });

View file

@ -9,9 +9,12 @@ import type { SourceControlledFile } from '@/environments/sourceControl/types/so
import * as utils from '../shared/utils/';
import { createUser } from '../shared/db/users';
import type { SuperAgentTest } from '../shared/types';
import { mockInstance } from '@test/mocking';
import { Telemetry } from '@/telemetry';
let authOwnerAgent: SuperAgentTest;
let owner: User;
mockInstance(Telemetry);
const testServer = utils.setupTestServer({
endpointGroups: ['sourceControl', 'license', 'auth'],

View file

@ -27,6 +27,7 @@ describe('ExecutionService', () => {
mock(),
mock(),
mock(),
mock(),
);
});
@ -344,17 +345,17 @@ describe('ExecutionService', () => {
});
});
describe('findAllActiveAndLatestFinished', () => {
test('should return all active and latest 20 finished executions', async () => {
describe('findLatestCurrentAndCompleted', () => {
test('should return latest current and completed executions', async () => {
const workflow = await createWorkflow();
const totalFinished = 21;
const totalCompleted = 21;
await Promise.all([
createExecution({ status: 'running' }, workflow),
createExecution({ status: 'running' }, workflow),
createExecution({ status: 'running' }, workflow),
...new Array(totalFinished)
...new Array(totalCompleted)
.fill(null)
.map(async () => await createExecution({ status: 'success' }, workflow)),
]);
@ -365,14 +366,14 @@ describe('ExecutionService', () => {
accessibleWorkflowIds: [workflow.id],
};
const output = await executionService.findAllRunningAndLatest(query);
const output = await executionService.findLatestCurrentAndCompleted(query);
expect(output.results).toHaveLength(23); // 3 active + 20 finished (excludes 21st)
expect(output.count).toBe(totalFinished); // 21 finished, excludes active
expect(output.results).toHaveLength(23); // 3 current + 20 completed (excludes 21st)
expect(output.count).toBe(totalCompleted); // 21 finished, excludes current
expect(output.estimated).toBe(false);
});
test('should handle zero active executions', async () => {
test('should handle zero current executions', async () => {
const workflow = await createWorkflow();
const totalFinished = 5;
@ -389,14 +390,14 @@ describe('ExecutionService', () => {
accessibleWorkflowIds: [workflow.id],
};
const output = await executionService.findAllRunningAndLatest(query);
const output = await executionService.findLatestCurrentAndCompleted(query);
expect(output.results).toHaveLength(totalFinished); // 5 finished
expect(output.count).toBe(totalFinished); // 5 finished, excludes active
expect(output.estimated).toBe(false);
});
test('should handle zero finished executions', async () => {
test('should handle zero completed executions', async () => {
const workflow = await createWorkflow();
await Promise.all([
@ -411,7 +412,7 @@ describe('ExecutionService', () => {
accessibleWorkflowIds: [workflow.id],
};
const output = await executionService.findAllRunningAndLatest(query);
const output = await executionService.findLatestCurrentAndCompleted(query);
expect(output.results).toHaveLength(3); // 3 finished
expect(output.count).toBe(0); // 0 finished, excludes active
@ -427,11 +428,36 @@ describe('ExecutionService', () => {
accessibleWorkflowIds: [workflow.id],
};
const output = await executionService.findAllRunningAndLatest(query);
const output = await executionService.findLatestCurrentAndCompleted(query);
expect(output.results).toHaveLength(0);
expect(output.count).toBe(0);
expect(output.estimated).toBe(false);
});
test('should prioritize `running` over `new` executions', async () => {
const workflow = await createWorkflow();
await Promise.all([
createExecution({ status: 'new' }, workflow),
createExecution({ status: 'new' }, workflow),
createExecution({ status: 'running' }, workflow),
createExecution({ status: 'running' }, workflow),
createExecution({ status: 'new' }, workflow),
createExecution({ status: 'new' }, workflow),
]);
const query: ExecutionSummaries.RangeQuery = {
kind: 'range',
range: { limit: 2 },
accessibleWorkflowIds: [workflow.id],
};
const { results } = await executionService.findLatestCurrentAndCompleted(query);
expect(results).toHaveLength(2);
expect(results[0].status).toBe('running');
expect(results[1].status).toBe('running');
});
});
});

View file

@ -5,8 +5,15 @@ import { createMember, createOwner } from './shared/db/users';
import { createWorkflow, shareWorkflowWithUsers } from './shared/db/workflows';
import * as testDb from './shared/testDb';
import { setupTestServer } from './shared/utils';
import { mockInstance } from '../shared/mocking';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { WaitTracker } from '@/WaitTracker';
import { createTeamProject, linkUserToProject } from './shared/db/projects';
mockInstance(WaitTracker);
mockInstance(ConcurrencyControlService, { isEnabled: false });
const testServer = setupTestServer({ endpointGroups: ['executions'] });
let owner: User;

View file

@ -17,6 +17,8 @@ import {
createWaitingExecution,
} from '../shared/db/executions';
import type { SuperAgentTest } from '../shared/types';
import { mockInstance } from '@test/mocking';
import { Telemetry } from '@/telemetry';
let owner: User;
let user1: User;
@ -26,6 +28,8 @@ let authUser1Agent: SuperAgentTest;
let authUser2Agent: SuperAgentTest;
let workflowRunner: ActiveWorkflowManager;
mockInstance(Telemetry);
const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] });
beforeAll(async () => {

View file

@ -20,6 +20,9 @@ import { createWorkflow, createWorkflowWithTrigger } from '../shared/db/workflow
import { createTag } from '../shared/db/tags';
import { mockInstance } from '../../shared/mocking';
import type { SuperAgentTest } from '../shared/types';
import { Telemetry } from '@/telemetry';
mockInstance(Telemetry);
let owner: User;
let ownerPersonalProject: Project;

View file

@ -27,7 +27,9 @@ import * as testDb from './shared/testDb';
import { mockInstance } from '../shared/mocking';
import type { SuperAgentTest } from './shared/types';
import { createTeamProject, getPersonalProject, linkUserToProject } from './shared/db/projects';
import { Telemetry } from '@/telemetry';
mockInstance(Telemetry);
mockInstance(ExecutionService);
const testServer = utils.setupTestServer({

View file

@ -15,6 +15,9 @@ import * as testDb from './shared/testDb';
import { createUser } from './shared/db/users';
import { createWorkflow } from './shared/db/workflows';
import type { SuperAgentTest } from './shared/types';
import { Telemetry } from '@/telemetry';
mockInstance(Telemetry);
describe('Webhook API', () => {
mockInstance(ExternalHooks);

View file

@ -5,6 +5,10 @@ import * as testDb from '../shared/testDb';
import { createUser } from '../shared/db/users';
import { createWorkflowWithTrigger } from '../shared/db/workflows';
import { createTeamProject } from '../shared/db/projects';
import { mockInstance } from '@test/mocking';
import { Telemetry } from '@/telemetry';
mockInstance(Telemetry);
let member: User;
let anotherMember: User;

View file

@ -6,6 +6,8 @@ import { createDeferredPromise } from 'n8n-workflow';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { ExecutionRepository } from '@db/repositories/execution.repository';
import { mock } from 'jest-mock-extended';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { mockInstance } from '@test/mocking';
const FAKE_EXECUTION_ID = '15';
const FAKE_SECOND_EXECUTION_ID = '20';
@ -18,11 +20,13 @@ const executionRepository = mock<ExecutionRepository>({
createNewExecution,
});
const concurrencyControl = mockInstance(ConcurrencyControlService, { isEnabled: false });
describe('ActiveExecutions', () => {
let activeExecutions: ActiveExecutions;
beforeEach(() => {
activeExecutions = new ActiveExecutions(mock(), executionRepository);
activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl);
});
afterEach(() => {

View file

@ -9,12 +9,16 @@ import { setupTestServer } from '../integration/shared/utils';
import { createUser } from '../integration/shared/db/users';
import { createWorkflow } from '../integration/shared/db/workflows';
import { createExecution } from '../integration/shared/db/executions';
import { mockInstance } from '@test/mocking';
import { Telemetry } from '@/telemetry';
let owner: User;
let runner: WorkflowRunner;
let hookFunctions: IWorkflowExecuteHooks;
setupTestServer({ endpointGroups: [] });
mockInstance(Telemetry);
class Watchers {
workflowExecuteAfter = jest.fn();
}

View file

@ -79,13 +79,13 @@ describe('ExecutionsController', () => {
'should fetch executions per query',
async (rangeQuery) => {
workflowSharingService.getSharedWorkflowIds.mockResolvedValue(['123']);
executionService.findAllRunningAndLatest.mockResolvedValue(NO_EXECUTIONS);
executionService.findLatestCurrentAndCompleted.mockResolvedValue(NO_EXECUTIONS);
const req = mock<ExecutionRequest.GetMany>({ rangeQuery });
await executionsController.getMany(req);
expect(executionService.findAllRunningAndLatest).not.toHaveBeenCalled();
expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled();
expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery);
},
);
@ -96,13 +96,13 @@ describe('ExecutionsController', () => {
'should fetch executions per query',
async (rangeQuery) => {
workflowSharingService.getSharedWorkflowIds.mockResolvedValue(['123']);
executionService.findAllRunningAndLatest.mockResolvedValue(NO_EXECUTIONS);
executionService.findLatestCurrentAndCompleted.mockResolvedValue(NO_EXECUTIONS);
const req = mock<ExecutionRequest.GetMany>({ rangeQuery });
await executionsController.getMany(req);
expect(executionService.findAllRunningAndLatest).toHaveBeenCalled();
expect(executionService.findLatestCurrentAndCompleted).toHaveBeenCalled();
expect(executionService.findRangeWithCount).not.toHaveBeenCalled();
},
);
@ -111,7 +111,7 @@ describe('ExecutionsController', () => {
describe('if both status and range provided', () => {
it('should fetch executions per query', async () => {
workflowSharingService.getSharedWorkflowIds.mockResolvedValue(['123']);
executionService.findAllRunningAndLatest.mockResolvedValue(NO_EXECUTIONS);
executionService.findLatestCurrentAndCompleted.mockResolvedValue(NO_EXECUTIONS);
const rangeQuery: ExecutionSummaries.RangeQuery = {
kind: 'range',
@ -124,7 +124,7 @@ describe('ExecutionsController', () => {
await executionsController.getMany(req);
expect(executionService.findAllRunningAndLatest).not.toHaveBeenCalled();
expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled();
expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery);
});
});

View file

@ -1,13 +1,13 @@
export const ExecutionStatusList = [
'canceled' as const,
'crashed' as const,
'error' as const,
'new' as const,
'running' as const,
'success' as const,
'unknown' as const,
'waiting' as const,
'warning' as const,
];
'canceled',
'crashed',
'error',
'new',
'running',
'success',
'unknown',
'waiting',
'warning',
] as const;
export type ExecutionStatus = (typeof ExecutionStatusList)[number];