diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts index 9b3eb53452..14e67a8d4d 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -9,6 +9,7 @@ import { Subscriber } from '../pubsub/subscriber.service'; describe('Subscriber', () => { beforeEach(() => { config.set('executions.mode', 'queue'); + jest.restoreAllMocks(); }); const client = mock(); @@ -47,14 +48,16 @@ describe('Subscriber', () => { }); }); - describe('setHandler', () => { - it('should set handler function', () => { + describe('setMessageHandler', () => { + it('should set message handler function for channel', () => { const subscriber = new Subscriber(mock(), redisClientService); + const channel = 'n8n.commands'; const handlerFn = jest.fn(); - subscriber.addMessageHandler(handlerFn); + subscriber.setMessageHandler(channel, handlerFn); - expect(client.on).toHaveBeenCalledWith('message', handlerFn); + // @ts-expect-error Private field + expect(subscriber.handlers).toEqual(new Map([[channel, handlerFn]])); }); }); }); diff --git a/packages/cli/src/scaling/constants.ts b/packages/cli/src/scaling/constants.ts index 4cf0563be3..f1e55d7ab1 100644 --- a/packages/cli/src/scaling/constants.ts +++ b/packages/cli/src/scaling/constants.ts @@ -2,6 +2,8 @@ export const QUEUE_NAME = 'jobs'; export const JOB_TYPE_NAME = 'job'; +/** Pubsub channel for commands sent by a main process to workers or to other main processes. */ export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands'; +/** Pubsub channel for messages sent by workers in response to commands from main processes. */ export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response'; diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index 9a02cab6ea..191f4b62d9 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -4,15 +4,11 @@ import type { IWorkflowDb } from '@/interfaces'; import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants'; -/** - * Pubsub channel used by scaling mode: - * - * - `n8n.commands` for messages sent by a main process to command workers or other main processes - * - `n8n.worker-response` for messages sent by workers in response to commands from main processes - */ -export type ScalingPubSubChannel = - | typeof COMMAND_PUBSUB_CHANNEL - | typeof WORKER_RESPONSE_PUBSUB_CHANNEL; +/** Pubsub channel used by scaling mode. */ +export type PubSubChannel = typeof COMMAND_PUBSUB_CHANNEL | typeof WORKER_RESPONSE_PUBSUB_CHANNEL; + +/** Handler function for every message received via a `PubSubChannel`. */ +export type PubSubHandlerFn = (msg: string) => void; export type PubSubMessageMap = { // #region Lifecycle diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 83a18c761a..5d4529fdb9 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -5,7 +5,7 @@ import config from '@/config'; import { Logger } from '@/logger'; import { RedisClientService } from '@/services/redis-client.service'; -import type { ScalingPubSubChannel } from './pubsub.types'; +import type { PubSubHandlerFn, PubSubChannel } from './pubsub.types'; /** * Responsible for subscribing to the pubsub channels used by scaling mode. @@ -14,6 +14,8 @@ import type { ScalingPubSubChannel } from './pubsub.types'; export class Subscriber { private readonly client: SingleNodeClient | MultiNodeClient; + private readonly handlers: Map = new Map(); + // #region Lifecycle constructor( @@ -26,6 +28,10 @@ export class Subscriber { this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); this.client.on('error', (error) => this.logger.error(error.message)); + + this.client.on('message', (channel: PubSubChannel, message) => { + this.handlers.get(channel)?.(message); + }); } getClient() { @@ -41,7 +47,7 @@ export class Subscriber { // #region Subscribing - async subscribe(channel: ScalingPubSubChannel) { + async subscribe(channel: PubSubChannel) { await this.client.subscribe(channel, (error) => { if (error) { this.logger.error('Failed to subscribe to channel', { channel, cause: error }); @@ -52,8 +58,9 @@ export class Subscriber { }); } - addMessageHandler(handlerFn: (channel: string, msg: string) => void) { - this.client.on('message', handlerFn); + /** Set the message handler function for a channel. */ + setMessageHandler(channel: PubSubChannel, handlerFn: PubSubHandlerFn) { + this.handlers.set(channel, handlerFn); } // #endregion diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index 1ba2e4a177..7f4effdd4a 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -1,6 +1,5 @@ import { Service } from 'typedi'; -import { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { handleCommandMessageMain } from './handle-command-message-main'; @@ -18,12 +17,10 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService await this.subscriber.subscribe('n8n.commands'); await this.subscriber.subscribe('n8n.worker-response'); - this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_PUBSUB_CHANNEL) { - await handleWorkerResponseMessageMain(messageString, options); - } else if (channel === COMMAND_PUBSUB_CHANNEL) { - await handleCommandMessageMain(messageString); - } + this.subscriber.setMessageHandler('n8n.worker-response', async (message: string) => { + await handleWorkerResponseMessageMain(message, options); }); + + this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageMain); } } diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts index 0a6c9b7b45..de7bded68e 100644 --- a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts +++ b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts @@ -1,6 +1,5 @@ import { Service } from 'typedi'; -import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { handleCommandMessageWebhook } from './handle-command-message-webhook'; @@ -15,10 +14,6 @@ export class OrchestrationHandlerWebhookService extends OrchestrationHandlerServ async initSubscriber() { await this.subscriber.subscribe('n8n.commands'); - this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { - if (channel === COMMAND_PUBSUB_CHANNEL) { - await handleCommandMessageWebhook(messageString); - } - }); + this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageWebhook); } } diff --git a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts index 7d5d1cb5f2..45b6bd57c9 100644 --- a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts +++ b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts @@ -14,142 +14,142 @@ import { CommunityPackagesService } from '@/services/community-packages.service' import type { WorkerCommandReceivedHandlerOptions } from './types'; import { debounceMessageReceiver, getOsCpuString } from '../helpers'; -export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) { - // eslint-disable-next-line complexity - return async (channel: string, messageString: string) => { - if (channel === COMMAND_PUBSUB_CHANNEL) { - if (!messageString) return; - const logger = Container.get(Logger); - let message: RedisServiceCommandObject; - try { - message = jsonParse(messageString); - } catch { - logger.debug( - `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - logger.debug( - `RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`, - ); - if (message.targets && !message.targets.includes(options.queueModeId)) { - return; // early return if the message is not for this worker - } - switch (message.command) { - case 'getStatus': - if (!debounceMessageReceiver(message, 500)) return; - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'getStatus', - payload: { - workerId: options.queueModeId, - runningJobsSummary: options.getRunningJobsSummary(), - freeMem: os.freemem(), - totalMem: os.totalmem(), - uptime: process.uptime(), - loadAvg: os.loadavg(), - cpus: getOsCpuString(), - arch: os.arch(), - platform: os.platform(), - hostname: os.hostname(), - interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) => - (interfaces ?? [])?.map((net) => ({ - family: net.family, - address: net.address, - internal: net.internal, - })), - ), - version: N8N_VERSION, - }, - }); - break; - case 'getId': - if (!debounceMessageReceiver(message, 500)) return; - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'getId', - }); - break; - case 'restartEventBus': - if (!debounceMessageReceiver(message, 500)) return; - try { - await Container.get(MessageEventBus).restart(); - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'restartEventBus', - payload: { - result: 'success', - }, - }); - } catch (error) { - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'restartEventBus', - payload: { - result: 'error', - error: (error as Error).message, - }, - }); - } - break; - case 'reloadExternalSecretsProviders': - if (!debounceMessageReceiver(message, 500)) return; - try { - await Container.get(ExternalSecretsManager).reloadAllProviders(); - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'reloadExternalSecretsProviders', - payload: { - result: 'success', - }, - }); - } catch (error) { - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'reloadExternalSecretsProviders', - payload: { - result: 'error', - error: (error as Error).message, - }, - }); - } - break; - case 'community-package-install': - case 'community-package-update': - case 'community-package-uninstall': - if (!debounceMessageReceiver(message, 500)) return; - const { packageName, packageVersion } = message.payload; - const communityPackagesService = Container.get(CommunityPackagesService); - if (message.command === 'community-package-uninstall') { - await communityPackagesService.removeNpmPackage(packageName); - } else { - await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion); - } - break; - case 'reloadLicense': - if (!debounceMessageReceiver(message, 500)) return; - await Container.get(License).reload(); - break; - case 'stopWorker': - if (!debounceMessageReceiver(message, 500)) return; - // TODO: implement proper shutdown - // await this.stopProcess(); - break; - default: - if ( - message.command === 'relay-execution-lifecycle-event' || - message.command === 'clear-test-webhooks' - ) { - break; // meant only for main - } +// eslint-disable-next-line complexity +export async function getWorkerCommandReceivedHandler( + messageString: string, + options: WorkerCommandReceivedHandlerOptions, +) { + if (!messageString) return; - logger.debug( - `Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`, - ); - break; - } - } + const logger = Container.get(Logger); + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + logger.debug( + `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + logger.debug( + `RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`, + ); + if (message.targets && !message.targets.includes(options.queueModeId)) { + return; // early return if the message is not for this worker } - }; + switch (message.command) { + case 'getStatus': + if (!debounceMessageReceiver(message, 500)) return; + await options.publisher.publishWorkerResponse({ + workerId: options.queueModeId, + command: 'getStatus', + payload: { + workerId: options.queueModeId, + runningJobsSummary: options.getRunningJobsSummary(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: getOsCpuString(), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) => + (interfaces ?? [])?.map((net) => ({ + family: net.family, + address: net.address, + internal: net.internal, + })), + ), + version: N8N_VERSION, + }, + }); + break; + case 'getId': + if (!debounceMessageReceiver(message, 500)) return; + await options.publisher.publishWorkerResponse({ + workerId: options.queueModeId, + command: 'getId', + }); + break; + case 'restartEventBus': + if (!debounceMessageReceiver(message, 500)) return; + try { + await Container.get(MessageEventBus).restart(); + await options.publisher.publishWorkerResponse({ + workerId: options.queueModeId, + command: 'restartEventBus', + payload: { + result: 'success', + }, + }); + } catch (error) { + await options.publisher.publishWorkerResponse({ + workerId: options.queueModeId, + command: 'restartEventBus', + payload: { + result: 'error', + error: (error as Error).message, + }, + }); + } + break; + case 'reloadExternalSecretsProviders': + if (!debounceMessageReceiver(message, 500)) return; + try { + await Container.get(ExternalSecretsManager).reloadAllProviders(); + await options.publisher.publishWorkerResponse({ + workerId: options.queueModeId, + command: 'reloadExternalSecretsProviders', + payload: { + result: 'success', + }, + }); + } catch (error) { + await options.publisher.publishWorkerResponse({ + workerId: options.queueModeId, + command: 'reloadExternalSecretsProviders', + payload: { + result: 'error', + error: (error as Error).message, + }, + }); + } + break; + case 'community-package-install': + case 'community-package-update': + case 'community-package-uninstall': + if (!debounceMessageReceiver(message, 500)) return; + const { packageName, packageVersion } = message.payload; + const communityPackagesService = Container.get(CommunityPackagesService); + if (message.command === 'community-package-uninstall') { + await communityPackagesService.removeNpmPackage(packageName); + } else { + await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion); + } + break; + case 'reloadLicense': + if (!debounceMessageReceiver(message, 500)) return; + await Container.get(License).reload(); + break; + case 'stopWorker': + if (!debounceMessageReceiver(message, 500)) return; + // TODO: implement proper shutdown + // await this.stopProcess(); + break; + default: + if ( + message.command === 'relay-execution-lifecycle-event' || + message.command === 'clear-test-webhooks' + ) { + break; // meant only for main + } + + logger.debug( + `Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`, + ); + break; + } + } } diff --git a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts index a1356df5de..06113d7344 100644 --- a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts +++ b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts @@ -14,6 +14,9 @@ export class OrchestrationHandlerWorkerService extends OrchestrationHandlerServi async initSubscriber(options: WorkerCommandReceivedHandlerOptions) { await this.subscriber.subscribe('n8n.commands'); - this.subscriber.addMessageHandler(getWorkerCommandReceivedHandler(options)); + + this.subscriber.setMessageHandler('n8n.commands', async (message: string) => { + await getWorkerCommandReceivedHandler(message, options); + }); } }