wip: refactor concurrency control service

This commit is contained in:
Eugene Molodkin 2024-12-24 14:44:57 +01:00
parent 7ea6c8b144
commit 1caf41a34a
No known key found for this signature in database
2 changed files with 68 additions and 4 deletions

View file

@ -19,10 +19,18 @@ export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
export class ConcurrencyControlService { export class ConcurrencyControlService {
private isEnabled: boolean; private isEnabled: boolean;
// private readonly limits: Map<ExecutionMode, number>;
private readonly productionLimit: number; private readonly productionLimit: number;
private readonly evaluationLimit: number;
// private readonly queues: Map<ExecutionMode, ConcurrencyQueue>;
private readonly productionQueue: ConcurrencyQueue; private readonly productionQueue: ConcurrencyQueue;
private readonly evaluationQueue: ConcurrencyQueue;
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
); );
@ -37,14 +45,24 @@ export class ConcurrencyControlService {
this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
this.evaluationLimit = config.getEnv('executions.concurrency.evaluationLimit');
if (this.productionLimit === 0) { if (this.productionLimit === 0) {
throw new InvalidConcurrencyLimitError(this.productionLimit); throw new InvalidConcurrencyLimitError(this.productionLimit);
} }
if (this.evaluationLimit === 0) {
throw new InvalidConcurrencyLimitError(this.evaluationLimit);
}
if (this.productionLimit < -1) { if (this.productionLimit < -1) {
this.productionLimit = -1; this.productionLimit = -1;
} }
if (this.evaluationLimit < -1) {
this.evaluationLimit = -1;
}
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
this.isEnabled = false; this.isEnabled = false;
return; return;
@ -52,6 +70,8 @@ export class ConcurrencyControlService {
this.productionQueue = new ConcurrencyQueue(this.productionLimit); this.productionQueue = new ConcurrencyQueue(this.productionLimit);
this.evaluationQueue = new ConcurrencyQueue(this.evaluationLimit);
this.logInit(); this.logInit();
this.isEnabled = true; this.isEnabled = true;
@ -80,7 +100,10 @@ export class ConcurrencyControlService {
has(executionId: string) { has(executionId: string) {
if (!this.isEnabled) return false; if (!this.isEnabled) return false;
return this.productionQueue.getAll().has(executionId); return (
this.productionQueue.getAll().has(executionId) ||
this.evaluationQueue.getAll().has(executionId)
);
} }
/** /**
@ -89,7 +112,11 @@ export class ConcurrencyControlService {
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return; if (!this.isEnabled || this.isUnlimited(mode)) return;
await this.productionQueue.enqueue(executionId); if (mode === 'evaluation') {
await this.evaluationQueue.enqueue(executionId);
} else {
await this.productionQueue.enqueue(executionId);
}
} }
/** /**
@ -98,7 +125,11 @@ export class ConcurrencyControlService {
release({ mode }: { mode: ExecutionMode }) { release({ mode }: { mode: ExecutionMode }) {
if (!this.isEnabled || this.isUnlimited(mode)) return; if (!this.isEnabled || this.isUnlimited(mode)) return;
this.productionQueue.dequeue(); if (mode === 'evaluation') {
this.evaluationQueue.dequeue();
} else {
this.productionQueue.dequeue();
}
} }
/** /**
@ -107,7 +138,11 @@ export class ConcurrencyControlService {
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return; if (!this.isEnabled || this.isUnlimited(mode)) return;
this.productionQueue.remove(executionId); if (mode === 'evaluation') {
this.evaluationQueue.remove(executionId);
} else {
this.productionQueue.remove(executionId);
}
} }
/** /**
@ -124,6 +159,12 @@ export class ConcurrencyControlService {
this.productionQueue.remove(id); this.productionQueue.remove(id);
} }
const enqueuedEvaluationIds = this.evaluationQueue.getAll();
for (const id of enqueuedEvaluationIds) {
this.evaluationQueue.remove(id);
}
const executionIds = Object.entries(activeExecutions) const executionIds = Object.entries(activeExecutions)
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
.map(([executionId, _]) => executionId); .map(([executionId, _]) => executionId);
@ -152,6 +193,13 @@ export class ConcurrencyControlService {
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
].join(' '), ].join(' '),
); );
this.logger.debug(
[
'Evaluation execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.evaluationLimit.toString(),
].join(' '),
);
} }
private isUnlimited(mode: ExecutionMode) { private isUnlimited(mode: ExecutionMode) {
@ -168,10 +216,20 @@ export class ConcurrencyControlService {
if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1;
if (mode === 'evaluation') return this.evaluationLimit === -1;
throw new UnknownExecutionModeError(mode); throw new UnknownExecutionModeError(mode);
} }
private shouldReport(capacity: number) { private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
} }
// private getQueue(mode: ExecutionMode) {
// if (['production', 'evaluation'].includes(mode)) {
// return this.queues.get(mode);
// }
//
// throw new UnknownExecutionModeError(mode);
// }
} }

View file

@ -35,6 +35,12 @@ export const schema = {
default: -1, default: -1,
env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT',
}, },
evaluationLimit: {
doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.',
format: Number,
default: 1,
env: 'N8N_CONCURRENCY_EVALUATION_LIMIT',
},
}, },
// A Workflow times out and gets canceled after this time (seconds). // A Workflow times out and gets canceled after this time (seconds).