mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
feat(core): Separate concurrency limits for production and evaluation executions (no-changelog) (#12387)
This commit is contained in:
parent
be520b4f60
commit
ce22f065c2
|
@ -1,6 +1,7 @@
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
|
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import type { ConcurrencyQueueType } from '@/concurrency/concurrency-control.service';
|
||||||
import {
|
import {
|
||||||
CLOUD_TEMP_PRODUCTION_LIMIT,
|
CLOUD_TEMP_PRODUCTION_LIMIT,
|
||||||
CLOUD_TEMP_REPORTABLE_THRESHOLDS,
|
CLOUD_TEMP_REPORTABLE_THRESHOLDS,
|
||||||
|
@ -24,61 +25,71 @@ describe('ConcurrencyControlService', () => {
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
config.set('executions.concurrency.productionLimit', -1);
|
config.set('executions.concurrency.productionLimit', -1);
|
||||||
|
config.set('executions.concurrency.evaluationLimit', -1);
|
||||||
config.set('executions.mode', 'integrated');
|
config.set('executions.mode', 'integrated');
|
||||||
|
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('constructor', () => {
|
describe('constructor', () => {
|
||||||
it('should be enabled if production cap is positive', () => {
|
it.each(['production', 'evaluation'])(
|
||||||
/**
|
'should be enabled if %s cap is positive',
|
||||||
* Arrange
|
(type: ConcurrencyQueueType) => {
|
||||||
*/
|
/**
|
||||||
config.set('executions.concurrency.productionLimit', 1);
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set(`executions.concurrency.${type}Limit`, 1);
|
||||||
|
|
||||||
/**
|
|
||||||
* Act
|
|
||||||
*/
|
|
||||||
const service = new ConcurrencyControlService(
|
|
||||||
logger,
|
|
||||||
executionRepository,
|
|
||||||
telemetry,
|
|
||||||
eventService,
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert
|
|
||||||
*/
|
|
||||||
// @ts-expect-error Private property
|
|
||||||
expect(service.isEnabled).toBe(true);
|
|
||||||
// @ts-expect-error Private property
|
|
||||||
expect(service.productionQueue).toBeDefined();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw if production cap is 0', () => {
|
|
||||||
/**
|
|
||||||
* Arrange
|
|
||||||
*/
|
|
||||||
config.set('executions.concurrency.productionLimit', 0);
|
|
||||||
|
|
||||||
try {
|
|
||||||
/**
|
/**
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
new ConcurrencyControlService(logger, executionRepository, telemetry, eventService);
|
const service = new ConcurrencyControlService(
|
||||||
} catch (error) {
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert
|
* Assert
|
||||||
*/
|
*/
|
||||||
expect(error).toBeInstanceOf(InvalidConcurrencyLimitError);
|
// @ts-expect-error Private property
|
||||||
}
|
expect(service.isEnabled).toBe(true);
|
||||||
});
|
// @ts-expect-error Private property
|
||||||
|
expect(service.queues.get(type)).toBeDefined();
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
expect(service.queues.size).toBe(1);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
it('should be disabled if production cap is -1', () => {
|
it.each(['production', 'evaluation'])(
|
||||||
|
'should throw if %s cap is 0',
|
||||||
|
(type: ConcurrencyQueueType) => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set(`executions.concurrency.${type}Limit`, 0);
|
||||||
|
|
||||||
|
try {
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
new ConcurrencyControlService(logger, executionRepository, telemetry, eventService);
|
||||||
|
} catch (error) {
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(error).toBeInstanceOf(InvalidConcurrencyLimitError);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
it('should be disabled if both production and evaluation caps are -1', () => {
|
||||||
/**
|
/**
|
||||||
* Arrange
|
* Arrange
|
||||||
*/
|
*/
|
||||||
config.set('executions.concurrency.productionLimit', -1);
|
config.set('executions.concurrency.productionLimit', -1);
|
||||||
|
config.set('executions.concurrency.evaluationLimit', -1);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Act
|
* Act
|
||||||
|
@ -97,28 +108,31 @@ describe('ConcurrencyControlService', () => {
|
||||||
expect(service.isEnabled).toBe(false);
|
expect(service.isEnabled).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should be disabled if production cap is lower than -1', () => {
|
it.each(['production', 'evaluation'])(
|
||||||
/**
|
'should be disabled if %s cap is lower than -1',
|
||||||
* Arrange
|
(type: ConcurrencyQueueType) => {
|
||||||
*/
|
/**
|
||||||
config.set('executions.concurrency.productionLimit', -2);
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set(`executions.concurrency.${type}Limit`, -2);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
const service = new ConcurrencyControlService(
|
const service = new ConcurrencyControlService(
|
||||||
logger,
|
logger,
|
||||||
executionRepository,
|
executionRepository,
|
||||||
telemetry,
|
telemetry,
|
||||||
eventService,
|
eventService,
|
||||||
);
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
// @ts-expect-error Private property
|
// @ts-expect-error Private property
|
||||||
expect(service.isEnabled).toBe(false);
|
expect(service.isEnabled).toBe(false);
|
||||||
});
|
},
|
||||||
|
);
|
||||||
|
|
||||||
it('should be disabled on queue mode', () => {
|
it('should be disabled on queue mode', () => {
|
||||||
/**
|
/**
|
||||||
|
@ -203,6 +217,31 @@ describe('ConcurrencyControlService', () => {
|
||||||
*/
|
*/
|
||||||
expect(enqueueSpy).toHaveBeenCalled();
|
expect(enqueueSpy).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should enqueue on evaluation mode', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.evaluationLimit', 1);
|
||||||
|
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
await service.throttle({ mode: 'evaluation', executionId: '1' });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(enqueueSpy).toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('release', () => {
|
describe('release', () => {
|
||||||
|
@ -258,6 +297,31 @@ describe('ConcurrencyControlService', () => {
|
||||||
*/
|
*/
|
||||||
expect(dequeueSpy).toHaveBeenCalled();
|
expect(dequeueSpy).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should dequeue on evaluation mode', () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.evaluationLimit', 1);
|
||||||
|
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
service.release({ mode: 'evaluation' });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(dequeueSpy).toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('remove', () => {
|
describe('remove', () => {
|
||||||
|
@ -316,14 +380,12 @@ describe('ConcurrencyControlService', () => {
|
||||||
expect(removeSpy).toHaveBeenCalled();
|
expect(removeSpy).toHaveBeenCalled();
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
});
|
|
||||||
|
|
||||||
describe('removeAll', () => {
|
it('should remove an execution on evaluation mode', () => {
|
||||||
it('should remove all executions from the production queue', async () => {
|
|
||||||
/**
|
/**
|
||||||
* Arrange
|
* Arrange
|
||||||
*/
|
*/
|
||||||
config.set('executions.concurrency.productionLimit', 2);
|
config.set('executions.concurrency.evaluationLimit', 1);
|
||||||
|
|
||||||
const service = new ConcurrencyControlService(
|
const service = new ConcurrencyControlService(
|
||||||
logger,
|
logger,
|
||||||
|
@ -331,28 +393,112 @@ describe('ConcurrencyControlService', () => {
|
||||||
telemetry,
|
telemetry,
|
||||||
eventService,
|
eventService,
|
||||||
);
|
);
|
||||||
|
|
||||||
jest
|
|
||||||
.spyOn(ConcurrencyQueue.prototype, 'getAll')
|
|
||||||
.mockReturnValueOnce(new Set(['1', '2', '3']));
|
|
||||||
|
|
||||||
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
|
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
await service.removeAll({
|
service.remove({ mode: 'evaluation', executionId: '1' });
|
||||||
'1': mock<IExecutingWorkflowData>(),
|
|
||||||
'2': mock<IExecutingWorkflowData>(),
|
|
||||||
'3': mock<IExecutingWorkflowData>(),
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert
|
* Assert
|
||||||
*/
|
*/
|
||||||
expect(removeSpy).toHaveBeenNthCalledWith(1, '1');
|
expect(removeSpy).toHaveBeenCalled();
|
||||||
expect(removeSpy).toHaveBeenNthCalledWith(2, '2');
|
});
|
||||||
expect(removeSpy).toHaveBeenNthCalledWith(3, '3');
|
});
|
||||||
|
|
||||||
|
describe('removeAll', () => {
|
||||||
|
it.each(['production', 'evaluation'])(
|
||||||
|
'should remove all executions from the %s queue',
|
||||||
|
async (type: ConcurrencyQueueType) => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set(`executions.concurrency.${type}Limit`, 2);
|
||||||
|
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
|
||||||
|
jest
|
||||||
|
.spyOn(ConcurrencyQueue.prototype, 'getAll')
|
||||||
|
.mockReturnValueOnce(new Set(['1', '2', '3']));
|
||||||
|
|
||||||
|
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
await service.removeAll({
|
||||||
|
'1': mock<IExecutingWorkflowData>(),
|
||||||
|
'2': mock<IExecutingWorkflowData>(),
|
||||||
|
'3': mock<IExecutingWorkflowData>(),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(removeSpy).toHaveBeenNthCalledWith(1, '1');
|
||||||
|
expect(removeSpy).toHaveBeenNthCalledWith(2, '2');
|
||||||
|
expect(removeSpy).toHaveBeenNthCalledWith(3, '3');
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('get queue', () => {
|
||||||
|
it('should choose the production queue', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.productionLimit', 2);
|
||||||
|
config.set('executions.concurrency.evaluationLimit', 2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
const queue = service.getQueue('webhook');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
expect(queue).toEqual(service.queues.get('production'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should choose the evaluation queue', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.productionLimit', 2);
|
||||||
|
config.set('executions.concurrency.evaluationLimit', 2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
const queue = service.getQueue('evaluation');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
expect(queue).toEqual(service.queues.get('evaluation'));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -388,6 +534,32 @@ describe('ConcurrencyControlService', () => {
|
||||||
*/
|
*/
|
||||||
expect(enqueueSpy).not.toHaveBeenCalled();
|
expect(enqueueSpy).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should do nothing for evaluation executions', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.evaluationLimit', -1);
|
||||||
|
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
await service.throttle({ mode: 'evaluation', executionId: '1' });
|
||||||
|
await service.throttle({ mode: 'evaluation', executionId: '2' });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(enqueueSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('release', () => {
|
describe('release', () => {
|
||||||
|
@ -415,6 +587,31 @@ describe('ConcurrencyControlService', () => {
|
||||||
*/
|
*/
|
||||||
expect(dequeueSpy).not.toHaveBeenCalled();
|
expect(dequeueSpy).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should do nothing for evaluation executions', () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.evaluationLimit', -1);
|
||||||
|
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
service.release({ mode: 'evaluation' });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(dequeueSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('remove', () => {
|
describe('remove', () => {
|
||||||
|
@ -442,6 +639,31 @@ describe('ConcurrencyControlService', () => {
|
||||||
*/
|
*/
|
||||||
expect(removeSpy).not.toHaveBeenCalled();
|
expect(removeSpy).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should do nothing for evaluation executions', () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.evaluationLimit', -1);
|
||||||
|
|
||||||
|
const service = new ConcurrencyControlService(
|
||||||
|
logger,
|
||||||
|
executionRepository,
|
||||||
|
telemetry,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
service.remove({ mode: 'evaluation', executionId: '1' });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(removeSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -470,14 +692,17 @@ describe('ConcurrencyControlService', () => {
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
// @ts-expect-error Private property
|
// @ts-expect-error Private property
|
||||||
service.productionQueue.emit('concurrency-check', {
|
service.queues.get('production').emit('concurrency-check', {
|
||||||
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert
|
* Assert
|
||||||
*/
|
*/
|
||||||
expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold });
|
expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', {
|
||||||
|
threshold,
|
||||||
|
concurrencyQueue: 'production',
|
||||||
|
});
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -500,7 +725,7 @@ describe('ConcurrencyControlService', () => {
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
// @ts-expect-error Private property
|
// @ts-expect-error Private property
|
||||||
service.productionQueue.emit('concurrency-check', {
|
service.queues.get('production').emit('concurrency-check', {
|
||||||
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -532,7 +757,7 @@ describe('ConcurrencyControlService', () => {
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
// @ts-expect-error Private property
|
// @ts-expect-error Private property
|
||||||
service.productionQueue.emit('concurrency-check', {
|
service.queues.get('production').emit('concurrency-check', {
|
||||||
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import { Service } from '@n8n/di';
|
import { Service } from '@n8n/di';
|
||||||
|
import { capitalize } from 'lodash';
|
||||||
import { Logger } from 'n8n-core';
|
import { Logger } from 'n8n-core';
|
||||||
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
|
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
|
||||||
|
|
||||||
|
@ -15,13 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue';
|
||||||
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
|
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
|
||||||
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
|
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
|
||||||
|
|
||||||
|
export type ConcurrencyQueueType = 'production' | 'evaluation';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ConcurrencyControlService {
|
export class ConcurrencyControlService {
|
||||||
private isEnabled: boolean;
|
private isEnabled: boolean;
|
||||||
|
|
||||||
private readonly productionLimit: number;
|
private readonly limits: Map<ConcurrencyQueueType, number>;
|
||||||
|
|
||||||
private readonly productionQueue: ConcurrencyQueue;
|
private readonly queues: Map<ConcurrencyQueueType, ConcurrencyQueue>;
|
||||||
|
|
||||||
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
|
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
|
||||||
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
|
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
|
||||||
|
@ -35,52 +38,74 @@ export class ConcurrencyControlService {
|
||||||
) {
|
) {
|
||||||
this.logger = this.logger.scoped('concurrency');
|
this.logger = this.logger.scoped('concurrency');
|
||||||
|
|
||||||
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
|
this.limits = new Map([
|
||||||
|
['production', config.getEnv('executions.concurrency.productionLimit')],
|
||||||
|
['evaluation', config.getEnv('executions.concurrency.evaluationLimit')],
|
||||||
|
]);
|
||||||
|
|
||||||
if (this.productionLimit === 0) {
|
this.limits.forEach((limit, type) => {
|
||||||
throw new InvalidConcurrencyLimitError(this.productionLimit);
|
if (limit === 0) {
|
||||||
}
|
throw new InvalidConcurrencyLimitError(limit);
|
||||||
|
}
|
||||||
|
|
||||||
if (this.productionLimit < -1) {
|
if (limit < -1) {
|
||||||
this.productionLimit = -1;
|
this.limits.set(type, -1);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
|
if (
|
||||||
|
Array.from(this.limits.values()).every((limit) => limit === -1) ||
|
||||||
|
config.getEnv('executions.mode') === 'queue'
|
||||||
|
) {
|
||||||
this.isEnabled = false;
|
this.isEnabled = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.productionQueue = new ConcurrencyQueue(this.productionLimit);
|
this.queues = new Map();
|
||||||
|
this.limits.forEach((limit, type) => {
|
||||||
|
if (limit > 0) {
|
||||||
|
this.queues.set(type, new ConcurrencyQueue(limit));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
this.logInit();
|
this.logInit();
|
||||||
|
|
||||||
this.isEnabled = true;
|
this.isEnabled = true;
|
||||||
|
|
||||||
this.productionQueue.on('concurrency-check', ({ capacity }) => {
|
this.queues.forEach((queue, type) => {
|
||||||
if (this.shouldReport(capacity)) {
|
queue.on('concurrency-check', ({ capacity }) => {
|
||||||
this.telemetry.track('User hit concurrency limit', {
|
if (this.shouldReport(capacity)) {
|
||||||
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
|
this.telemetry.track('User hit concurrency limit', {
|
||||||
});
|
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
|
||||||
}
|
concurrencyQueue: type,
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
this.productionQueue.on('execution-throttled', ({ executionId }) => {
|
queue.on('execution-throttled', ({ executionId }) => {
|
||||||
this.logger.debug('Execution throttled', { executionId });
|
this.logger.debug('Execution throttled', { executionId, type });
|
||||||
this.eventService.emit('execution-throttled', { executionId });
|
this.eventService.emit('execution-throttled', { executionId, type });
|
||||||
});
|
});
|
||||||
|
|
||||||
this.productionQueue.on('execution-released', async (executionId) => {
|
queue.on('execution-released', (executionId) => {
|
||||||
this.logger.debug('Execution released', { executionId });
|
this.logger.debug('Execution released', { executionId, type });
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether an execution is in the production queue.
|
* Check whether an execution is in any of the queues.
|
||||||
*/
|
*/
|
||||||
has(executionId: string) {
|
has(executionId: string) {
|
||||||
if (!this.isEnabled) return false;
|
if (!this.isEnabled) return false;
|
||||||
|
|
||||||
return this.productionQueue.getAll().has(executionId);
|
for (const queue of this.queues.values()) {
|
||||||
|
if (queue.has(executionId)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -89,16 +114,16 @@ export class ConcurrencyControlService {
|
||||||
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
||||||
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
||||||
|
|
||||||
await this.productionQueue.enqueue(executionId);
|
await this.getQueue(mode)?.enqueue(executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release capacity back so the next execution in the production queue can proceed.
|
* Release capacity back so the next execution in the queue can proceed.
|
||||||
*/
|
*/
|
||||||
release({ mode }: { mode: ExecutionMode }) {
|
release({ mode }: { mode: ExecutionMode }) {
|
||||||
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
||||||
|
|
||||||
this.productionQueue.dequeue();
|
this.getQueue(mode)?.dequeue();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -107,7 +132,7 @@ export class ConcurrencyControlService {
|
||||||
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
||||||
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
||||||
|
|
||||||
this.productionQueue.remove(executionId);
|
this.getQueue(mode)?.remove(executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -118,11 +143,13 @@ export class ConcurrencyControlService {
|
||||||
async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) {
|
async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) {
|
||||||
if (!this.isEnabled) return;
|
if (!this.isEnabled) return;
|
||||||
|
|
||||||
const enqueuedProductionIds = this.productionQueue.getAll();
|
this.queues.forEach((queue) => {
|
||||||
|
const enqueuedExecutionIds = queue.getAll();
|
||||||
|
|
||||||
for (const id of enqueuedProductionIds) {
|
for (const id of enqueuedExecutionIds) {
|
||||||
this.productionQueue.remove(id);
|
queue.remove(id);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
const executionIds = Object.entries(activeExecutions)
|
const executionIds = Object.entries(activeExecutions)
|
||||||
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
|
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
|
||||||
|
@ -146,15 +173,28 @@ export class ConcurrencyControlService {
|
||||||
private logInit() {
|
private logInit() {
|
||||||
this.logger.debug('Enabled');
|
this.logger.debug('Enabled');
|
||||||
|
|
||||||
this.logger.debug(
|
this.limits.forEach((limit, type) => {
|
||||||
[
|
this.logger.debug(
|
||||||
'Production execution concurrency is',
|
[
|
||||||
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
|
`${capitalize(type)} execution concurrency is`,
|
||||||
].join(' '),
|
limit === -1 ? 'unlimited' : 'limited to ' + limit.toString(),
|
||||||
);
|
].join(' '),
|
||||||
|
);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private isUnlimited(mode: ExecutionMode) {
|
private isUnlimited(mode: ExecutionMode) {
|
||||||
|
return this.getQueue(mode) === undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
private shouldReport(capacity: number) {
|
||||||
|
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the concurrency queue based on the execution mode.
|
||||||
|
*/
|
||||||
|
private getQueue(mode: ExecutionMode) {
|
||||||
if (
|
if (
|
||||||
mode === 'error' ||
|
mode === 'error' ||
|
||||||
mode === 'integrated' ||
|
mode === 'integrated' ||
|
||||||
|
@ -163,15 +203,13 @@ export class ConcurrencyControlService {
|
||||||
mode === 'manual' ||
|
mode === 'manual' ||
|
||||||
mode === 'retry'
|
mode === 'retry'
|
||||||
) {
|
) {
|
||||||
return true;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1;
|
if (mode === 'webhook' || mode === 'trigger') return this.queues.get('production');
|
||||||
|
|
||||||
|
if (mode === 'evaluation') return this.queues.get('evaluation');
|
||||||
|
|
||||||
throw new UnknownExecutionModeError(mode);
|
throw new UnknownExecutionModeError(mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
private shouldReport(capacity: number) {
|
|
||||||
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,10 @@ export class ConcurrencyQueue extends TypedEmitter<ConcurrencyEvents> {
|
||||||
return new Set(this.queue.map((item) => item.executionId));
|
return new Set(this.queue.map((item) => item.executionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
has(executionId: string) {
|
||||||
|
return this.queue.some((item) => item.executionId === executionId);
|
||||||
|
}
|
||||||
|
|
||||||
private resolveNext() {
|
private resolveNext() {
|
||||||
const item = this.queue.shift();
|
const item = this.queue.shift();
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,12 @@ export const schema = {
|
||||||
default: -1,
|
default: -1,
|
||||||
env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT',
|
env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT',
|
||||||
},
|
},
|
||||||
|
evaluationLimit: {
|
||||||
|
doc: 'Max evaluation executions allowed to run concurrently.',
|
||||||
|
format: Number,
|
||||||
|
default: -1,
|
||||||
|
env: 'N8N_CONCURRENCY_EVALUATION_LIMIT',
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
// A Workflow times out and gets canceled after this time (seconds).
|
// A Workflow times out and gets canceled after this time (seconds).
|
||||||
|
|
|
@ -934,6 +934,7 @@ describe('LogStreamingEventRelay', () => {
|
||||||
it('should log on `execution-throttled` event', () => {
|
it('should log on `execution-throttled` event', () => {
|
||||||
const event: RelayEventMap['execution-throttled'] = {
|
const event: RelayEventMap['execution-throttled'] = {
|
||||||
executionId: 'exec123456',
|
executionId: 'exec123456',
|
||||||
|
type: 'production',
|
||||||
};
|
};
|
||||||
|
|
||||||
eventService.emit('execution-throttled', event);
|
eventService.emit('execution-throttled', event);
|
||||||
|
@ -942,6 +943,7 @@ describe('LogStreamingEventRelay', () => {
|
||||||
eventName: 'n8n.execution.throttled',
|
eventName: 'n8n.execution.throttled',
|
||||||
payload: {
|
payload: {
|
||||||
executionId: 'exec123456',
|
executionId: 'exec123456',
|
||||||
|
type: 'production',
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -6,6 +6,7 @@ import type {
|
||||||
IWorkflowExecutionDataProcess,
|
IWorkflowExecutionDataProcess,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
|
import type { ConcurrencyQueueType } from '@/concurrency/concurrency-control.service';
|
||||||
import type { AuthProviderType } from '@/databases/entities/auth-identity';
|
import type { AuthProviderType } from '@/databases/entities/auth-identity';
|
||||||
import type { GlobalRole, User } from '@/databases/entities/user';
|
import type { GlobalRole, User } from '@/databases/entities/user';
|
||||||
import type { IWorkflowDb } from '@/interfaces';
|
import type { IWorkflowDb } from '@/interfaces';
|
||||||
|
@ -337,6 +338,7 @@ export type RelayEventMap = {
|
||||||
|
|
||||||
'execution-throttled': {
|
'execution-throttled': {
|
||||||
executionId: string;
|
executionId: string;
|
||||||
|
type: ConcurrencyQueueType;
|
||||||
};
|
};
|
||||||
|
|
||||||
'execution-started-during-bootup': {
|
'execution-started-during-bootup': {
|
||||||
|
|
|
@ -385,10 +385,10 @@ export class LogStreamingEventRelay extends EventRelay {
|
||||||
|
|
||||||
// #region Execution
|
// #region Execution
|
||||||
|
|
||||||
private executionThrottled({ executionId }: RelayEventMap['execution-throttled']) {
|
private executionThrottled({ executionId, type }: RelayEventMap['execution-throttled']) {
|
||||||
void this.eventBus.sendExecutionEvent({
|
void this.eventBus.sendExecutionEvent({
|
||||||
eventName: 'n8n.execution.throttled',
|
eventName: 'n8n.execution.throttled',
|
||||||
payload: { executionId },
|
payload: { executionId, type },
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
<script lang="ts" setup>
|
<script lang="ts" setup>
|
||||||
import { computed } from 'vue';
|
import { computed } from 'vue';
|
||||||
import { useI18n } from '@/composables/useI18n';
|
import { useI18n } from '@/composables/useI18n';
|
||||||
|
import { WORKFLOW_EVALUATION_EXPERIMENT } from '@/constants';
|
||||||
|
import { usePostHog } from '@/stores/posthog.store';
|
||||||
|
|
||||||
const props = defineProps<{
|
const props = defineProps<{
|
||||||
runningExecutionsCount: number;
|
runningExecutionsCount: number;
|
||||||
|
@ -14,14 +16,22 @@ const emit = defineEmits<{
|
||||||
|
|
||||||
const i18n = useI18n();
|
const i18n = useI18n();
|
||||||
|
|
||||||
const tooltipText = computed(() =>
|
const posthogStore = usePostHog();
|
||||||
i18n.baseText('executionsList.activeExecutions.tooltip', {
|
|
||||||
|
const tooltipText = computed(() => {
|
||||||
|
let text = i18n.baseText('executionsList.activeExecutions.tooltip', {
|
||||||
interpolate: {
|
interpolate: {
|
||||||
running: props.runningExecutionsCount,
|
running: props.runningExecutionsCount,
|
||||||
cap: props.concurrencyCap,
|
cap: props.concurrencyCap,
|
||||||
},
|
},
|
||||||
}),
|
});
|
||||||
);
|
|
||||||
|
if (posthogStore.isFeatureEnabled(WORKFLOW_EVALUATION_EXPERIMENT)) {
|
||||||
|
text += '\n' + i18n.baseText('executionsList.activeExecutions.evaluationNote');
|
||||||
|
}
|
||||||
|
|
||||||
|
return text;
|
||||||
|
});
|
||||||
|
|
||||||
const headerText = computed(() => {
|
const headerText = computed(() => {
|
||||||
if (props.runningExecutionsCount === 0) {
|
if (props.runningExecutionsCount === 0) {
|
||||||
|
@ -38,6 +48,7 @@ const headerText = computed(() => {
|
||||||
|
|
||||||
<template>
|
<template>
|
||||||
<div data-test-id="concurrent-executions-header">
|
<div data-test-id="concurrent-executions-header">
|
||||||
|
<n8n-text>{{ headerText }}</n8n-text>
|
||||||
<n8n-tooltip>
|
<n8n-tooltip>
|
||||||
<template #content>
|
<template #content>
|
||||||
<div :class="$style.tooltip">
|
<div :class="$style.tooltip">
|
||||||
|
@ -60,9 +71,8 @@ const headerText = computed(() => {
|
||||||
>
|
>
|
||||||
</div>
|
</div>
|
||||||
</template>
|
</template>
|
||||||
<font-awesome-icon icon="info-circle" class="mr-2xs" />
|
<font-awesome-icon icon="info-circle" class="ml-2xs" />
|
||||||
</n8n-tooltip>
|
</n8n-tooltip>
|
||||||
<n8n-text>{{ headerText }}</n8n-text>
|
|
||||||
</div>
|
</div>
|
||||||
</template>
|
</template>
|
||||||
|
|
||||||
|
|
|
@ -73,8 +73,15 @@ const isAnnotationEnabled = computed(
|
||||||
() => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters],
|
() => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the number of executions counted towards the production executions concurrency limit.
|
||||||
|
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
|
||||||
|
*/
|
||||||
const runningExecutionsCount = computed(() => {
|
const runningExecutionsCount = computed(() => {
|
||||||
return props.executions.filter((execution) => execution.status === 'running').length;
|
return props.executions.filter(
|
||||||
|
(execution) =>
|
||||||
|
execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode),
|
||||||
|
).length;
|
||||||
});
|
});
|
||||||
|
|
||||||
watch(
|
watch(
|
||||||
|
|
|
@ -54,8 +54,15 @@ const executionListRef = ref<HTMLElement | null>(null);
|
||||||
|
|
||||||
const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow);
|
const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the number of executions counted towards the production executions concurrency limit.
|
||||||
|
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
|
||||||
|
*/
|
||||||
const runningExecutionsCount = computed(() => {
|
const runningExecutionsCount = computed(() => {
|
||||||
return props.executions.filter((execution) => execution.status === 'running').length;
|
return props.executions.filter(
|
||||||
|
(execution) =>
|
||||||
|
execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode),
|
||||||
|
).length;
|
||||||
});
|
});
|
||||||
|
|
||||||
watch(
|
watch(
|
||||||
|
|
|
@ -714,6 +714,7 @@
|
||||||
"executionsList.activeExecutions.none": "No active executions",
|
"executionsList.activeExecutions.none": "No active executions",
|
||||||
"executionsList.activeExecutions.header": "{running}/{cap} active executions",
|
"executionsList.activeExecutions.header": "{running}/{cap} active executions",
|
||||||
"executionsList.activeExecutions.tooltip": "Current active executions: {running} out of {cap}. This instance is limited to {cap} concurrent production executions.",
|
"executionsList.activeExecutions.tooltip": "Current active executions: {running} out of {cap}. This instance is limited to {cap} concurrent production executions.",
|
||||||
|
"executionsList.activeExecutions.evaluationNote": "Evaluation runs appear in the list of executions but do not count towards your execution concurrency.",
|
||||||
"executionsList.allWorkflows": "All Workflows",
|
"executionsList.allWorkflows": "All Workflows",
|
||||||
"executionsList.anyStatus": "Any Status",
|
"executionsList.anyStatus": "Any Status",
|
||||||
"executionsList.autoRefresh": "Auto refresh",
|
"executionsList.autoRefresh": "Auto refresh",
|
||||||
|
|
Loading…
Reference in a new issue