refactor(core): Bring job options into scaling service (#11050)

This commit is contained in:
Iván Ovejero 2024-10-02 10:31:07 +02:00 committed by GitHub
parent 8a30f92156
commit 113a2e7401
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 24 additions and 18 deletions

View file

@ -11,7 +11,7 @@ import { mockInstance } from '@test/mocking';
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
import type { JobProcessor } from '../job-processor';
import { ScalingService } from '../scaling.service';
import type { Job, JobData, JobOptions, JobQueue } from '../scaling.types';
import type { Job, JobData, JobQueue } from '../scaling.types';
const queue = mock<JobQueue>({
client: { ping: jest.fn() },
@ -208,10 +208,13 @@ describe('ScalingService', () => {
queue.add.mockResolvedValue(mock<Job>({ id: '456' }));
const jobData = mock<JobData>({ executionId: '123' });
const jobOptions = mock<JobOptions>();
await scalingService.addJob(jobData, jobOptions);
await scalingService.addJob(jobData, { priority: 100 });
expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, jobOptions);
expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, {
priority: 100,
removeOnComplete: true,
removeOnFail: true,
});
});
});

View file

@ -2,6 +2,7 @@ import { GlobalConfig } from '@n8n/config';
import { InstanceSettings } from 'n8n-core';
import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify } from 'n8n-workflow';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { strict } from 'node:assert';
import Container, { Service } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
@ -124,12 +125,24 @@ export class ScalingService {
return { active, waiting };
}
async addJob(jobData: JobData, jobOptions: JobOptions) {
const { executionId } = jobData;
/**
* Add a job to the queue.
*
* @param jobData Data of the job to add to the queue.
* @param priority Priority of the job, from `1` (highest) to `MAX_SAFE_INTEGER` (lowest).
*/
async addJob(jobData: JobData, { priority }: { priority: number }) {
strict(priority > 0 && priority <= Number.MAX_SAFE_INTEGER);
const jobOptions: JobOptions = {
priority,
removeOnComplete: true,
removeOnFail: true,
};
const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions);
this.logger.info(`[ScalingService] Added job ${job.id} (execution ${executionId})`);
this.logger.info(`[ScalingService] Added job ${job.id} (execution ${jobData.executionId})`);
return job;
}

View file

@ -376,22 +376,12 @@ export class WorkflowRunner {
this.scalingService = Container.get(ScalingService);
}
let priority = 100;
if (realtime === true) {
// Jobs which require a direct response get a higher priority
priority = 50;
}
// TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.
// Check if they get retried by default and how often.
const jobOptions = {
priority,
removeOnComplete: true,
removeOnFail: true,
};
let job: Job;
let hooks: WorkflowHooks;
try {
job = await this.scalingService.addJob(jobData, jobOptions);
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(
data.executionMode,