diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index f6c3f4670f..18102f8f4c 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -42,7 +42,7 @@ import type { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { ExternalHooks } from './external-hooks'; import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants'; import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types'; -import type { RunningJobSummary } from './scaling/types'; +import type { RunningJobSummary } from './scaling/scaling.types'; import type { Scope } from '@n8n/permissions'; export interface ICredentialsTypeData { diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts index 99370c1e1b..dfdb4099bd 100644 --- a/packages/cli/src/executions/__tests__/execution.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -11,7 +11,7 @@ import type { WaitTracker } from '@/wait-tracker'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRequest } from '@/executions/execution.types'; import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; -import type { Job } from '@/scaling/types'; +import type { Job } from '@/scaling/scaling.types'; import { mockInstance } from '@test/mocking'; describe('ExecutionService', () => { diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index adbf5ebde2..f87d02b33d 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -3,7 +3,7 @@ import { ScalingService } from '../scaling.service'; import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants'; import config from '@/config'; import * as BullModule from 'bull'; -import type { Job, JobData, JobOptions, JobQueue } from '../types'; +import type { Job, JobData, JobOptions, JobQueue } from '../scaling.types'; import { ApplicationError } from 'n8n-workflow'; import { mockInstance } from '@test/mocking'; import { GlobalConfig } from '@n8n/config'; @@ -22,6 +22,8 @@ jest.mock('bull', () => ({ })); describe('ScalingService', () => { + const Bull = jest.mocked(BullModule.default); + const globalConfig = mockInstance(GlobalConfig, { queue: { bull: { @@ -40,11 +42,29 @@ describe('ScalingService', () => { const instanceSettings = Container.get(InstanceSettings); const orchestrationService = mock({ isMultiMainSetupEnabled: false }); const jobProcessor = mock(); + let scalingService: ScalingService; + let registerMainListenersSpy: jest.SpyInstance; + let registerWorkerListenersSpy: jest.SpyInstance; + let scheduleQueueRecoverySpy: jest.SpyInstance; + let stopQueueRecoverySpy: jest.SpyInstance; + let getRunningJobsCountSpy: jest.SpyInstance; + + const bullConstructorArgs = [ + QUEUE_NAME, + { + prefix: globalConfig.queue.bull.prefix, + settings: globalConfig.queue.bull.settings, + createClient: expect.any(Function), + }, + ]; + beforeEach(() => { jest.clearAllMocks(); config.set('generic.instanceType', 'main'); + instanceSettings.markAsLeader(); + scalingService = new ScalingService( mock(), mock(), @@ -54,98 +74,93 @@ describe('ScalingService', () => { instanceSettings, orchestrationService, ); - }); - afterEach(() => { - scalingService.stopQueueRecovery(); + getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount'); + + // @ts-expect-error Private method + ScalingService.prototype.scheduleQueueRecovery = jest.fn(); + // @ts-expect-error Private method + registerMainListenersSpy = jest.spyOn(scalingService, 'registerMainListeners'); + // @ts-expect-error Private method + registerWorkerListenersSpy = jest.spyOn(scalingService, 'registerWorkerListeners'); + // @ts-expect-error Private method + scheduleQueueRecoverySpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); + // @ts-expect-error Private method + stopQueueRecoverySpy = jest.spyOn(scalingService, 'stopQueueRecovery'); }); describe('setupQueue', () => { - it('should set up the queue', async () => { - /** - * Arrange - */ - const { prefix, settings } = globalConfig.queue.bull; - const Bull = jest.mocked(BullModule.default); + describe('if leader main', () => { + it('should set up queue + listeners + queue recovery', async () => { + await scalingService.setupQueue(); - /** - * Act - */ - await scalingService.setupQueue(); - - /** - * Assert - */ - expect(Bull).toHaveBeenCalledWith(QUEUE_NAME, { - prefix, - settings, - createClient: expect.any(Function), + expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); + expect(registerMainListenersSpy).toHaveBeenCalled(); + expect(registerWorkerListenersSpy).not.toHaveBeenCalled(); + expect(scheduleQueueRecoverySpy).toHaveBeenCalled(); + }); + }); + + describe('if follower main', () => { + it('should set up queue + listeners', async () => { + instanceSettings.markAsFollower(); + + await scalingService.setupQueue(); + + expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); + expect(registerMainListenersSpy).toHaveBeenCalled(); + expect(registerWorkerListenersSpy).not.toHaveBeenCalled(); + expect(scheduleQueueRecoverySpy).not.toHaveBeenCalled(); + }); + }); + + describe('if worker', () => { + it('should set up queue + listeners', async () => { + // @ts-expect-error Private field + scalingService.instanceType = 'worker'; + + await scalingService.setupQueue(); + + expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); + expect(registerWorkerListenersSpy).toHaveBeenCalled(); + expect(registerMainListenersSpy).not.toHaveBeenCalled(); }); - expect(queue.on).toHaveBeenCalledWith('global:progress', expect.any(Function)); - expect(queue.on).toHaveBeenCalledWith('error', expect.any(Function)); }); }); describe('setupWorker', () => { it('should set up a worker with concurrency', async () => { - /** - * Arrange - */ - config.set('generic.instanceType', 'worker'); - const scalingService = new ScalingService( - mock(), - mock(), - mock(), - globalConfig, - mock(), - instanceSettings, - orchestrationService, - ); + // @ts-expect-error Private field + scalingService.instanceType = 'worker'; await scalingService.setupQueue(); const concurrency = 5; - /** - * Act - */ scalingService.setupWorker(concurrency); - /** - * Assert - */ expect(queue.process).toHaveBeenCalledWith(JOB_TYPE_NAME, concurrency, expect.any(Function)); }); it('should throw if called on a non-worker instance', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); - /** - * Act and Assert - */ + expect(() => scalingService.setupWorker(5)).toThrow(); + }); + + it('should throw if called before queue is ready', async () => { + // @ts-expect-error Private field + scalingService.instanceType = 'worker'; + expect(() => scalingService.setupWorker(5)).toThrow(); }); }); describe('stop', () => { - it('should pause the queue, check for running jobs, and stop queue recovery', async () => { - /** - * Arrange - */ + it('should pause queue, wait for running jobs, stop queue recovery', async () => { await scalingService.setupQueue(); jobProcessor.getRunningJobIds.mockReturnValue([]); - const stopQueueRecoverySpy = jest.spyOn(scalingService, 'stopQueueRecovery'); - const getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount'); - /** - * Act - */ await scalingService.stop(); - /** - * Assert - */ expect(queue.pause).toHaveBeenCalledWith(true, true); expect(stopQueueRecoverySpy).toHaveBeenCalled(); expect(getRunningJobsCountSpy).toHaveBeenCalled(); @@ -154,62 +169,35 @@ describe('ScalingService', () => { describe('pingQueue', () => { it('should ping the queue', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); - /** - * Act - */ await scalingService.pingQueue(); - /** - * Assert - */ expect(queue.client.ping).toHaveBeenCalled(); }); }); describe('addJob', () => { it('should add a job', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); queue.add.mockResolvedValue(mock({ id: '456' })); - /** - * Act - */ const jobData = mock({ executionId: '123' }); const jobOptions = mock(); await scalingService.addJob(jobData, jobOptions); - /** - * Assert - */ expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, jobOptions); }); }); describe('getJob', () => { it('should get a job', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); const jobId = '123'; queue.getJob.mockResolvedValue(mock({ id: jobId })); - /** - * Act - */ const job = await scalingService.getJob(jobId); - /** - * Assert - */ expect(queue.getJob).toHaveBeenCalledWith(jobId); expect(job?.id).toBe(jobId); }); @@ -217,88 +205,49 @@ describe('ScalingService', () => { describe('findJobsByStatus', () => { it('should find jobs by status', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); queue.getJobs.mockResolvedValue([mock({ id: '123' })]); - /** - * Act - */ const jobs = await scalingService.findJobsByStatus(['active']); - /** - * Assert - */ expect(queue.getJobs).toHaveBeenCalledWith(['active']); expect(jobs).toHaveLength(1); expect(jobs.at(0)?.id).toBe('123'); }); it('should filter out `null` in Redis response', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); // @ts-expect-error - Untyped but possible Redis response queue.getJobs.mockResolvedValue([mock(), null]); - /** - * Act - */ const jobs = await scalingService.findJobsByStatus(['waiting']); - /** - * Assert - */ expect(jobs).toHaveLength(1); }); }); describe('stopJob', () => { it('should stop an active job', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); const job = mock({ isActive: jest.fn().mockResolvedValue(true) }); - /** - * Act - */ const result = await scalingService.stopJob(job); - /** - * Assert - */ expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' }); expect(result).toBe(true); }); it('should stop an inactive job', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); const job = mock({ isActive: jest.fn().mockResolvedValue(false) }); - /** - * Act - */ const result = await scalingService.stopJob(job); - /** - * Assert - */ expect(job.remove).toHaveBeenCalled(); expect(result).toBe(true); }); it('should report failure to stop a job', async () => { - /** - * Arrange - */ await scalingService.setupQueue(); const job = mock({ isActive: jest.fn().mockImplementation(() => { @@ -306,53 +255,9 @@ describe('ScalingService', () => { }), }); - /** - * Act - */ const result = await scalingService.stopJob(job); - /** - * Assert - */ expect(result).toBe(false); }); }); - - describe('scheduleQueueRecovery', () => { - it('if leader, should schedule queue recovery', async () => { - /** - * Arrange - */ - const scheduleSpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); - instanceSettings.markAsLeader(); - - /** - * Act - */ - await scalingService.setupQueue(); - - /** - * Assert - */ - expect(scheduleSpy).toHaveBeenCalled(); - }); - - it('if follower, should not schedule queue recovery', async () => { - /** - * Arrange - */ - const scheduleSpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); - instanceSettings.markAsFollower(); - - /** - * Act - */ - await scalingService.setupQueue(); - - /** - * Assert - */ - expect(scheduleSpy).not.toHaveBeenCalled(); - }); - }); }); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 8618424e35..7804a6d6c8 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -8,7 +8,7 @@ import { WorkflowRepository } from '@/databases/repositories/workflow.repository import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import { NodeTypes } from '@/node-types'; import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; -import type { Job, JobId, JobResult, RunningJob, RunningJobSummary } from './types'; +import type { Job, JobId, JobResult, RunningJob, RunningJobSummary } from './scaling.types'; import type PCancelable from 'p-cancelable'; /** diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 4dbdce1aac..37436cdb43 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -13,11 +13,11 @@ import type { Job, JobData, JobOptions, - JobMessage, JobStatus, JobId, QueueRecoveryContext, -} from './types'; + PubSubMessage, +} from './scaling.types'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import { GlobalConfig } from '@n8n/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; @@ -71,6 +71,7 @@ export class ScalingService { setupWorker(concurrency: number) { this.assertWorker(); + this.assertQueue(); void this.queue.process( JOB_TYPE_NAME, @@ -161,22 +162,6 @@ export class ScalingService { // #region Listeners private registerListeners() { - this.queue.on('global:progress', (_jobId: JobId, msg: JobMessage) => { - if (msg.kind === 'respond-to-webhook') { - const { executionId, response } = msg; - this.activeExecutions.resolveResponsePromise( - executionId, - this.decodeWebhookResponse(response), - ); - } - }); - - this.queue.on('global:progress', (jobId: JobId, msg: JobMessage) => { - if (msg.kind === 'abort-job') { - this.jobProcessor.stopJob(jobId); - } - }); - let latestAttemptTs = 0; let cumulativeTimeoutMs = 0; @@ -210,10 +195,28 @@ export class ScalingService { return; } - if ( - this.instanceType === 'worker' && - error.message.includes('job stalled more than maxStalledCount') - ) { + throw error; + }); + + if (this.instanceType === 'main') { + this.registerMainListeners(); + } else if (this.instanceType === 'worker') { + this.registerWorkerListeners(); + } + } + + /** + * Register listeners on a `worker` process for Bull queue events. + */ + private registerWorkerListeners() { + this.queue.on('global:progress', (jobId: JobId, msg: unknown) => { + if (!this.isPubSubMessage(msg)) return; + + if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId); + }); + + this.queue.on('error', (error: Error) => { + if (error.message.includes('job stalled more than maxStalledCount')) { throw new MaxStalledCountError(error); } @@ -221,10 +224,7 @@ export class ScalingService { * Non-recoverable error on worker start with Redis unavailable. * Even if Redis recovers, worker will remain unable to process jobs. */ - if ( - this.instanceType === 'worker' && - error.message.includes('Error initializing Lua scripts') - ) { + if (error.message.includes('Error initializing Lua scripts')) { this.logger.error('[ScalingService] Fatal error initializing worker', { error }); this.logger.error('[ScalingService] Exiting process...'); process.exit(1); @@ -234,6 +234,24 @@ export class ScalingService { }); } + /** + * Register listeners on a `main` process for Bull queue events. + */ + private registerMainListeners() { + this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => { + if (!this.isPubSubMessage(msg)) return; + + if (msg.kind === 'respond-to-webhook') { + const decodedResponse = this.decodeWebhookResponse(msg.response); + this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); + } + }); + } + + private isPubSubMessage(candidate: unknown): candidate is PubSubMessage { + return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; + } + // #endregion private decodeWebhookResponse( @@ -252,6 +270,12 @@ export class ScalingService { return response; } + private assertQueue() { + if (this.queue) return; + + throw new ApplicationError('This method must be called after `setupQueue`'); + } + private assertWorker() { if (this.instanceType === 'worker') return; @@ -265,7 +289,7 @@ export class ScalingService { waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000, }; - scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) { + private scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) { this.queueRecoveryContext.timeout = setTimeout(async () => { try { const nextWaitMs = await this.recoverFromQueue(); @@ -285,7 +309,7 @@ export class ScalingService { this.logger.debug(`[ScalingService] Scheduled queue recovery check for next ${wait}`); } - stopQueueRecovery() { + private stopQueueRecovery() { clearTimeout(this.queueRecoveryContext.timeout); } diff --git a/packages/cli/src/scaling/types.ts b/packages/cli/src/scaling/scaling.types.ts similarity index 85% rename from packages/cli/src/scaling/types.ts rename to packages/cli/src/scaling/scaling.types.ts index b35d1d109d..2599cf594b 100644 --- a/packages/cli/src/scaling/types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -28,16 +28,19 @@ export type JobStatus = Bull.JobStatus; export type JobOptions = Bull.JobOptions; -/** Message sent by worker to queue or by queue to worker. */ -export type JobMessage = RepondToWebhookMessage | AbortJobMessage; +export type PubSubMessage = MessageToMain | MessageToWorker; -export type RepondToWebhookMessage = { +type MessageToMain = RepondToWebhookMessage; + +type MessageToWorker = AbortJobMessage; + +type RepondToWebhookMessage = { kind: 'respond-to-webhook'; executionId: string; response: IExecuteResponsePromiseData; }; -export type AbortJobMessage = { +type AbortJobMessage = { kind: 'abort-job'; }; diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index 84c515466e..957a6106ba 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -1,6 +1,6 @@ import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher'; -import type { RunningJobSummary } from '@/scaling/types'; +import type { RunningJobSummary } from '@/scaling/scaling.types'; export interface WorkerCommandReceivedHandlerOptions { queueModeId: string; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index b8012b3d4f..acfbdafd01 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -28,7 +28,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExternalHooks } from '@/external-hooks'; import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { NodeTypes } from '@/node-types'; -import type { Job, JobData, JobResult } from '@/scaling/types'; +import type { Job, JobData, JobResult } from '@/scaling/scaling.types'; import type { ScalingService } from '@/scaling/scaling.service'; import * as WorkflowHelpers from '@/workflow-helpers'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index f75e1df712..fb57cbc36b 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -40,8 +40,15 @@ export class InstanceSettings { readonly instanceId = this.generateInstanceId(); - /** Always `leader` in single-main setup. `leader` or `follower` in multi-main setup. */ - private instanceRole: InstanceRole = 'unset'; + /** + * A main is: + * - `unset` during bootup, + * - `leader` after bootup in single-main setup, + * - `leader` or `follower` after bootup in multi-main setup. + * + * A non-main instance type (e.g. `worker`) is always `unset`. + */ + instanceRole: InstanceRole = 'unset'; get isLeader() { return this.instanceRole === 'leader';