diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 9beae22af6..f1ae78f838 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -11,7 +11,7 @@ import { mockInstance } from '@test/mocking'; import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants'; import type { JobProcessor } from '../job-processor'; import { ScalingService } from '../scaling.service'; -import type { Job, JobData, JobOptions, JobQueue } from '../scaling.types'; +import type { Job, JobData, JobQueue } from '../scaling.types'; const queue = mock({ client: { ping: jest.fn() }, @@ -208,10 +208,13 @@ describe('ScalingService', () => { queue.add.mockResolvedValue(mock({ id: '456' })); const jobData = mock({ executionId: '123' }); - const jobOptions = mock(); - await scalingService.addJob(jobData, jobOptions); + await scalingService.addJob(jobData, { priority: 100 }); - expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, jobOptions); + expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, { + priority: 100, + removeOnComplete: true, + removeOnFail: true, + }); }); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index b7c4034afa..c670c52b99 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -2,6 +2,7 @@ import { GlobalConfig } from '@n8n/config'; import { InstanceSettings } from 'n8n-core'; import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify } from 'n8n-workflow'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; +import { strict } from 'node:assert'; import Container, { Service } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; @@ -124,12 +125,24 @@ export class ScalingService { return { active, waiting }; } - async addJob(jobData: JobData, jobOptions: JobOptions) { - const { executionId } = jobData; + /** + * Add a job to the queue. + * + * @param jobData Data of the job to add to the queue. + * @param priority Priority of the job, from `1` (highest) to `MAX_SAFE_INTEGER` (lowest). + */ + async addJob(jobData: JobData, { priority }: { priority: number }) { + strict(priority > 0 && priority <= Number.MAX_SAFE_INTEGER); + + const jobOptions: JobOptions = { + priority, + removeOnComplete: true, + removeOnFail: true, + }; const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions); - this.logger.info(`[ScalingService] Added job ${job.id} (execution ${executionId})`); + this.logger.info(`[ScalingService] Added job ${job.id} (execution ${jobData.executionId})`); return job; } diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index dd394cbf51..637c94d866 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -376,22 +376,12 @@ export class WorkflowRunner { this.scalingService = Container.get(ScalingService); } - let priority = 100; - if (realtime === true) { - // Jobs which require a direct response get a higher priority - priority = 50; - } // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds. // Check if they get retried by default and how often. - const jobOptions = { - priority, - removeOnComplete: true, - removeOnFail: true, - }; let job: Job; let hooks: WorkflowHooks; try { - job = await this.scalingService.addJob(jobData, jobOptions); + job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain( data.executionMode,