wip: Refactor concurrency control service to support multiple types of concurrency limits

This commit is contained in:
Eugene Molodkin 2024-12-27 14:51:34 +01:00
parent 3d63aa11a5
commit 6576357376
No known key found for this signature in database
4 changed files with 390 additions and 188 deletions

View file

@ -1,6 +1,7 @@
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { ConcurrencyType } from '@/concurrency/concurrency-control.service';
import { import {
CLOUD_TEMP_PRODUCTION_LIMIT, CLOUD_TEMP_PRODUCTION_LIMIT,
CLOUD_TEMP_REPORTABLE_THRESHOLDS, CLOUD_TEMP_REPORTABLE_THRESHOLDS,
@ -24,17 +25,20 @@ describe('ConcurrencyControlService', () => {
afterEach(() => { afterEach(() => {
config.set('executions.concurrency.productionLimit', -1); config.set('executions.concurrency.productionLimit', -1);
config.set('executions.concurrency.evaluationLimit', -1);
config.set('executions.mode', 'integrated'); config.set('executions.mode', 'integrated');
jest.clearAllMocks(); jest.clearAllMocks();
}); });
describe('constructor', () => { describe('constructor', () => {
it('should be enabled if production cap is positive', () => { it.each(['production', 'evaluation'])(
'should be enabled if %s cap is positive',
(type: ConcurrencyType) => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); config.set(`executions.concurrency.${type}Limit`, 1);
/** /**
* Act * Act
@ -52,14 +56,17 @@ describe('ConcurrencyControlService', () => {
// @ts-expect-error Private property // @ts-expect-error Private property
expect(service.isEnabled).toBe(true); expect(service.isEnabled).toBe(true);
// @ts-expect-error Private property // @ts-expect-error Private property
expect(service.productionQueue).toBeDefined(); expect(service.queues.get(type)).toBeDefined();
}); },
);
it('should throw if production cap is 0', () => { it.each(['production', 'evaluation'])(
'should throw if %s cap is 0',
(type: ConcurrencyType) => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 0); config.set(`executions.concurrency.${type}Limit`, 0);
try { try {
/** /**
@ -72,13 +79,15 @@ describe('ConcurrencyControlService', () => {
*/ */
expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); expect(error).toBeInstanceOf(InvalidConcurrencyLimitError);
} }
}); },
);
it('should be disabled if production cap is -1', () => { it('should be disabled if both production and evaluation caps are -1', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', -1); config.set('executions.concurrency.productionLimit', -1);
config.set('executions.concurrency.evaluationLimit', -1);
/** /**
* Act * Act
@ -97,11 +106,13 @@ describe('ConcurrencyControlService', () => {
expect(service.isEnabled).toBe(false); expect(service.isEnabled).toBe(false);
}); });
it('should be disabled if production cap is lower than -1', () => { it.each(['production', 'evaluation'])(
'should be disabled if %s cap is lower than -1',
(type: ConcurrencyType) => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', -2); config.set(`executions.concurrency.${type}Limit`, -2);
/** /**
* Act * Act
@ -118,7 +129,8 @@ describe('ConcurrencyControlService', () => {
*/ */
// @ts-expect-error Private property // @ts-expect-error Private property
expect(service.isEnabled).toBe(false); expect(service.isEnabled).toBe(false);
}); },
);
it('should be disabled on queue mode', () => { it('should be disabled on queue mode', () => {
/** /**
@ -203,6 +215,31 @@ describe('ConcurrencyControlService', () => {
*/ */
expect(enqueueSpy).toHaveBeenCalled(); expect(enqueueSpy).toHaveBeenCalled();
}); });
it('should enqueue on evaluation mode', async () => {
/**
* Arrange
*/
config.set('executions.concurrency.evaluationLimit', 1);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
* Act
*/
await service.throttle({ mode: 'evaluation', executionId: '1' });
/**
* Assert
*/
expect(enqueueSpy).toHaveBeenCalled();
});
}); });
describe('release', () => { describe('release', () => {
@ -258,6 +295,31 @@ describe('ConcurrencyControlService', () => {
*/ */
expect(dequeueSpy).toHaveBeenCalled(); expect(dequeueSpy).toHaveBeenCalled();
}); });
it('should dequeue on evaluation mode', () => {
/**
* Arrange
*/
config.set('executions.concurrency.evaluationLimit', 1);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
* Act
*/
service.release({ mode: 'evaluation' });
/**
* Assert
*/
expect(dequeueSpy).toHaveBeenCalled();
});
}); });
describe('remove', () => { describe('remove', () => {
@ -316,14 +378,41 @@ describe('ConcurrencyControlService', () => {
expect(removeSpy).toHaveBeenCalled(); expect(removeSpy).toHaveBeenCalled();
}, },
); );
});
describe('removeAll', () => { it('should remove an execution on evaluation mode', () => {
it('should remove all executions from the production queue', async () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 2); config.set('executions.concurrency.evaluationLimit', 1);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
* Act
*/
service.remove({ mode: 'evaluation', executionId: '1' });
/**
* Assert
*/
expect(removeSpy).toHaveBeenCalled();
});
});
describe('removeAll', () => {
it.each(['production', 'evaluation'])(
'should remove all executions from the %s queue',
async (type: ConcurrencyType) => {
/**
* Arrange
*/
config.set(`executions.concurrency.${type}Limit`, 2);
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@ -353,6 +442,61 @@ describe('ConcurrencyControlService', () => {
expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); expect(removeSpy).toHaveBeenNthCalledWith(1, '1');
expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); expect(removeSpy).toHaveBeenNthCalledWith(2, '2');
expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); expect(removeSpy).toHaveBeenNthCalledWith(3, '3');
},
);
});
describe('get queue', () => {
it('should choose the production queue', async () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 2);
config.set('executions.concurrency.evaluationLimit', 2);
/**
* Act
*/
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
// @ts-expect-error Private property
const queue = service.getQueue('webhook');
/**
* Assert
*/
// @ts-expect-error Private property
expect(queue).toEqual(service.queues.get('production'));
});
it('should choose the evaluation queue', async () => {
/**
* Arrange
*/
config.set('executions.concurrency.productionLimit', 2);
config.set('executions.concurrency.evaluationLimit', 2);
/**
* Act
*/
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
// @ts-expect-error Private property
const queue = service.getQueue('evaluation');
/**
* Assert
*/
// @ts-expect-error Private property
expect(queue).toEqual(service.queues.get('evaluation'));
}); });
}); });
}); });
@ -388,6 +532,32 @@ describe('ConcurrencyControlService', () => {
*/ */
expect(enqueueSpy).not.toHaveBeenCalled(); expect(enqueueSpy).not.toHaveBeenCalled();
}); });
it('should do nothing for evaluation executions', async () => {
/**
* Arrange
*/
config.set('executions.concurrency.evaluationLimit', -1);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
* Act
*/
await service.throttle({ mode: 'evaluation', executionId: '1' });
await service.throttle({ mode: 'evaluation', executionId: '2' });
/**
* Assert
*/
expect(enqueueSpy).not.toHaveBeenCalled();
});
}); });
describe('release', () => { describe('release', () => {
@ -415,6 +585,31 @@ describe('ConcurrencyControlService', () => {
*/ */
expect(dequeueSpy).not.toHaveBeenCalled(); expect(dequeueSpy).not.toHaveBeenCalled();
}); });
it('should do nothing for evaluation executions', () => {
/**
* Arrange
*/
config.set('executions.concurrency.evaluationLimit', -1);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
* Act
*/
service.release({ mode: 'evaluation' });
/**
* Assert
*/
expect(dequeueSpy).not.toHaveBeenCalled();
});
}); });
describe('remove', () => { describe('remove', () => {
@ -442,6 +637,31 @@ describe('ConcurrencyControlService', () => {
*/ */
expect(removeSpy).not.toHaveBeenCalled(); expect(removeSpy).not.toHaveBeenCalled();
}); });
it('should do nothing for evaluation executions', () => {
/**
* Arrange
*/
config.set('executions.concurrency.evaluationLimit', -1);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventService,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
* Act
*/
service.remove({ mode: 'evaluation', executionId: '1' });
/**
* Assert
*/
expect(removeSpy).not.toHaveBeenCalled();
});
}); });
}); });
@ -470,14 +690,17 @@ describe('ConcurrencyControlService', () => {
* Act * Act
*/ */
// @ts-expect-error Private property // @ts-expect-error Private property
service.productionQueue.emit('concurrency-check', { service.queues.get('production').emit('concurrency-check', {
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
}); });
/** /**
* Assert * Assert
*/ */
expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold }); expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', {
threshold,
concurrencyType: 'production',
});
}, },
); );
@ -500,7 +723,7 @@ describe('ConcurrencyControlService', () => {
* Act * Act
*/ */
// @ts-expect-error Private property // @ts-expect-error Private property
service.productionQueue.emit('concurrency-check', { service.queues.get('production').emit('concurrency-check', {
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
}); });
@ -532,7 +755,7 @@ describe('ConcurrencyControlService', () => {
* Act * Act
*/ */
// @ts-expect-error Private property // @ts-expect-error Private property
service.productionQueue.emit('concurrency-check', { service.queues.get('production').emit('concurrency-check', {
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
}); });

View file

@ -1,3 +1,4 @@
import { capitalize } from 'lodash';
import { Logger } from 'n8n-core'; import { Logger } from 'n8n-core';
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import { Service } from 'typedi'; import { Service } from 'typedi';
@ -15,21 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue';
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999; export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
export type ConcurrencyType = 'production' | 'evaluation';
@Service() @Service()
export class ConcurrencyControlService { export class ConcurrencyControlService {
private isEnabled: boolean; private isEnabled: boolean;
// private readonly limits: Map<ExecutionMode, number>; private readonly limits: Map<ConcurrencyType, number>;
private readonly productionLimit: number; private readonly queues: Map<ConcurrencyType, ConcurrencyQueue>;
private readonly evaluationLimit: number;
// private readonly queues: Map<ExecutionMode, ConcurrencyQueue>;
private readonly productionQueue: ConcurrencyQueue;
private readonly evaluationQueue: ConcurrencyQueue;
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
@ -43,76 +38,66 @@ export class ConcurrencyControlService {
) { ) {
this.logger = this.logger.scoped('concurrency'); this.logger = this.logger.scoped('concurrency');
this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); this.limits = new Map([
['production', config.getEnv('executions.concurrency.productionLimit')],
['evaluation', config.getEnv('executions.concurrency.evaluationLimit')],
]);
this.evaluationLimit = config.getEnv('executions.concurrency.evaluationLimit'); this.limits.forEach((limit, type) => {
if (limit === 0) {
if (this.productionLimit === 0) { throw new InvalidConcurrencyLimitError(limit);
throw new InvalidConcurrencyLimitError(this.productionLimit);
} }
if (this.evaluationLimit === 0) { if (limit < -1) {
throw new InvalidConcurrencyLimitError(this.evaluationLimit); this.limits.set(type, -1);
} }
});
if (this.productionLimit < -1) { if (
this.productionLimit = -1; Array.from(this.limits.values()).every((limit) => limit === -1) ||
} config.getEnv('executions.mode') === 'queue'
) {
if (this.evaluationLimit < -1) {
this.evaluationLimit = -1;
}
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
this.isEnabled = false; this.isEnabled = false;
return; return;
} }
this.productionQueue = new ConcurrencyQueue(this.productionLimit); this.queues = new Map();
this.limits.forEach((limit, type) => {
this.evaluationQueue = new ConcurrencyQueue(this.evaluationLimit); this.queues.set(type, new ConcurrencyQueue(limit));
});
this.logInit(); this.logInit();
this.isEnabled = true; this.isEnabled = true;
this.productionQueue.on('concurrency-check', ({ capacity }) => { this.queues.forEach((queue, type) => {
queue.on('concurrency-check', ({ capacity }) => {
if (this.shouldReport(capacity)) { if (this.shouldReport(capacity)) {
this.telemetry.track('User hit concurrency limit', { this.telemetry.track('User hit concurrency limit', {
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
concurrencyType: type,
}); });
} }
}); });
this.productionQueue.on('execution-throttled', ({ executionId }) => { queue.on('execution-throttled', ({ executionId }) => {
this.logger.debug('Execution throttled', { executionId }); this.logger.debug('Execution throttled', { executionId, type });
this.eventService.emit('execution-throttled', { executionId }); this.eventService.emit('execution-throttled', { executionId, type });
}); });
this.productionQueue.on('execution-released', async (executionId) => { queue.on('execution-released', async (executionId) => {
this.logger.debug('Execution released', { executionId }); this.logger.debug('Execution released', { executionId, type });
}); });
this.evaluationQueue.on('execution-throttled', ({ executionId }) => {
this.logger.debug('Evaluation execution throttled', { executionId });
this.eventService.emit('execution-throttled', { executionId });
});
this.evaluationQueue.on('execution-released', async (executionId) => {
this.logger.debug('Evaluation execution released', { executionId });
}); });
} }
/** /**
* Check whether an execution is in the production queue. * Check whether an execution is in any of the queues.
*/ */
has(executionId: string) { has(executionId: string) {
if (!this.isEnabled) return false; if (!this.isEnabled) return false;
return ( return Array.from(this.queues.values()).some((queue) => queue.getAll().has(executionId));
this.productionQueue.getAll().has(executionId) ||
this.evaluationQueue.getAll().has(executionId)
);
} }
/** /**
@ -121,23 +106,23 @@ export class ConcurrencyControlService {
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return; if (!this.isEnabled || this.isUnlimited(mode)) return;
if (mode === 'evaluation') { const queue = this.getQueue(mode);
await this.evaluationQueue.enqueue(executionId);
} else { if (queue) {
await this.productionQueue.enqueue(executionId); await queue.enqueue(executionId);
} }
} }
/** /**
* Release capacity back so the next execution in the production queue can proceed. * Release capacity back so the next execution in the queue can proceed.
*/ */
release({ mode }: { mode: ExecutionMode }) { release({ mode }: { mode: ExecutionMode }) {
if (!this.isEnabled || this.isUnlimited(mode)) return; if (!this.isEnabled || this.isUnlimited(mode)) return;
if (mode === 'evaluation') { const queue = this.getQueue(mode);
this.evaluationQueue.dequeue();
} else { if (queue) {
this.productionQueue.dequeue(); queue.dequeue();
} }
} }
@ -147,10 +132,10 @@ export class ConcurrencyControlService {
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return; if (!this.isEnabled || this.isUnlimited(mode)) return;
if (mode === 'evaluation') { const queue = this.getQueue(mode);
this.evaluationQueue.remove(executionId);
} else { if (queue) {
this.productionQueue.remove(executionId); queue.remove(executionId);
} }
} }
@ -162,17 +147,13 @@ export class ConcurrencyControlService {
async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) { async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) {
if (!this.isEnabled) return; if (!this.isEnabled) return;
const enqueuedProductionIds = this.productionQueue.getAll(); this.queues.forEach((queue) => {
const enqueuedExecutionIds = queue.getAll();
for (const id of enqueuedProductionIds) { for (const id of enqueuedExecutionIds) {
this.productionQueue.remove(id); queue.remove(id);
}
const enqueuedEvaluationIds = this.evaluationQueue.getAll();
for (const id of enqueuedEvaluationIds) {
this.evaluationQueue.remove(id);
} }
});
const executionIds = Object.entries(activeExecutions) const executionIds = Object.entries(activeExecutions)
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
@ -196,22 +177,30 @@ export class ConcurrencyControlService {
private logInit() { private logInit() {
this.logger.debug('Enabled'); this.logger.debug('Enabled');
this.limits.forEach((limit, type) => {
this.logger.debug( this.logger.debug(
[ [
'Production execution concurrency is', `${capitalize(type)} execution concurrency is`,
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), limit === -1 ? 'unlimited' : 'limited to ' + limit.toString(),
].join(' '),
);
this.logger.debug(
[
'Evaluation execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.evaluationLimit.toString(),
].join(' '), ].join(' '),
); );
});
} }
private isUnlimited(mode: ExecutionMode) { private isUnlimited(mode: ExecutionMode) {
const queue = this.getQueue(mode);
return queue === undefined;
}
private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}
/**
* Get the concurrency queue based on the execution mode.
*/
private getQueue(mode: ExecutionMode) {
if ( if (
mode === 'error' || mode === 'error' ||
mode === 'integrated' || mode === 'integrated' ||
@ -220,25 +209,13 @@ export class ConcurrencyControlService {
mode === 'manual' || mode === 'manual' ||
mode === 'retry' mode === 'retry'
) { ) {
return true; return undefined;
} }
if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; if (mode === 'webhook' || mode === 'trigger') return this.queues.get('production');
if (mode === 'evaluation') return this.evaluationLimit === -1; if (mode === 'evaluation') return this.queues.get('evaluation');
throw new UnknownExecutionModeError(mode); throw new UnknownExecutionModeError(mode);
} }
private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}
// private getQueue(mode: ExecutionMode) {
// if (['production', 'evaluation'].includes(mode)) {
// return this.queues.get(mode);
// }
//
// throw new UnknownExecutionModeError(mode);
// }
} }

View file

@ -38,7 +38,7 @@ export const schema = {
evaluationLimit: { evaluationLimit: {
doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.', doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.',
format: Number, format: Number,
default: 1, default: -1,
env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', env: 'N8N_CONCURRENCY_EVALUATION_LIMIT',
}, },
}, },

View file

@ -12,6 +12,7 @@ import type { GlobalRole, User } from '@/databases/entities/user';
import type { IWorkflowDb } from '@/interfaces'; import type { IWorkflowDb } from '@/interfaces';
import type { AiEventMap } from './ai.event-map'; import type { AiEventMap } from './ai.event-map';
import { ConcurrencyType } from '@/concurrency/concurrency-control.service';
export type UserLike = { export type UserLike = {
id: string; id: string;
@ -338,6 +339,7 @@ export type RelayEventMap = {
'execution-throttled': { 'execution-throttled': {
executionId: string; executionId: string;
type: ConcurrencyType;
}; };
'execution-started-during-bootup': { 'execution-started-during-bootup': {