From 1caf41a34a274ede091e7dc8c4621f8f7982b53d Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Tue, 24 Dec 2024 14:44:57 +0100 Subject: [PATCH] wip: refactor concurrency control service --- .../concurrency-control.service.ts | 66 +++++++++++++++++-- packages/cli/src/config/schema.ts | 6 ++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index ede6cf8997..1aced615a8 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -19,10 +19,18 @@ export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; export class ConcurrencyControlService { private isEnabled: boolean; + // private readonly limits: Map; + private readonly productionLimit: number; + private readonly evaluationLimit: number; + + // private readonly queues: Map; + private readonly productionQueue: ConcurrencyQueue; + private readonly evaluationQueue: ConcurrencyQueue; + private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, ); @@ -37,14 +45,24 @@ export class ConcurrencyControlService { this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); + this.evaluationLimit = config.getEnv('executions.concurrency.evaluationLimit'); + if (this.productionLimit === 0) { throw new InvalidConcurrencyLimitError(this.productionLimit); } + if (this.evaluationLimit === 0) { + throw new InvalidConcurrencyLimitError(this.evaluationLimit); + } + if (this.productionLimit < -1) { this.productionLimit = -1; } + if (this.evaluationLimit < -1) { + this.evaluationLimit = -1; + } + if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { this.isEnabled = false; return; @@ -52,6 +70,8 @@ export class ConcurrencyControlService { this.productionQueue = new ConcurrencyQueue(this.productionLimit); + this.evaluationQueue = new ConcurrencyQueue(this.evaluationLimit); + this.logInit(); this.isEnabled = true; @@ -80,7 +100,10 @@ export class ConcurrencyControlService { has(executionId: string) { if (!this.isEnabled) return false; - return this.productionQueue.getAll().has(executionId); + return ( + this.productionQueue.getAll().has(executionId) || + this.evaluationQueue.getAll().has(executionId) + ); } /** @@ -89,7 +112,11 @@ export class ConcurrencyControlService { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - await this.productionQueue.enqueue(executionId); + if (mode === 'evaluation') { + await this.evaluationQueue.enqueue(executionId); + } else { + await this.productionQueue.enqueue(executionId); + } } /** @@ -98,7 +125,11 @@ export class ConcurrencyControlService { release({ mode }: { mode: ExecutionMode }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - this.productionQueue.dequeue(); + if (mode === 'evaluation') { + this.evaluationQueue.dequeue(); + } else { + this.productionQueue.dequeue(); + } } /** @@ -107,7 +138,11 @@ export class ConcurrencyControlService { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - this.productionQueue.remove(executionId); + if (mode === 'evaluation') { + this.evaluationQueue.remove(executionId); + } else { + this.productionQueue.remove(executionId); + } } /** @@ -124,6 +159,12 @@ export class ConcurrencyControlService { this.productionQueue.remove(id); } + const enqueuedEvaluationIds = this.evaluationQueue.getAll(); + + for (const id of enqueuedEvaluationIds) { + this.evaluationQueue.remove(id); + } + const executionIds = Object.entries(activeExecutions) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) .map(([executionId, _]) => executionId); @@ -152,6 +193,13 @@ export class ConcurrencyControlService { this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), ].join(' '), ); + + this.logger.debug( + [ + 'Evaluation execution concurrency is', + this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.evaluationLimit.toString(), + ].join(' '), + ); } private isUnlimited(mode: ExecutionMode) { @@ -168,10 +216,20 @@ export class ConcurrencyControlService { if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; + if (mode === 'evaluation') return this.evaluationLimit === -1; + throw new UnknownExecutionModeError(mode); } private shouldReport(capacity: number) { return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); } + + // private getQueue(mode: ExecutionMode) { + // if (['production', 'evaluation'].includes(mode)) { + // return this.queues.get(mode); + // } + // + // throw new UnknownExecutionModeError(mode); + // } } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index e8d28cb782..20d03f07d5 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -35,6 +35,12 @@ export const schema = { default: -1, env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', }, + evaluationLimit: { + doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.', + format: Number, + default: 1, + env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', + }, }, // A Workflow times out and gets canceled after this time (seconds).