diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index e8a47e10e0..a0f9e10f80 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -4,7 +4,8 @@ import { Container } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; -import { OrchestrationHandlerWebhookService } from '@/services/orchestration/webhook/orchestration.handler.webhook.service'; +import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; import { WebhookServer } from '@/webhooks/webhook-server'; @@ -110,6 +111,11 @@ export class Webhook extends BaseCommand { async initOrchestration() { await Container.get(OrchestrationWebhookService).init(); - await Container.get(OrchestrationHandlerWebhookService).init(); + + const subscriber = Container.get(Subscriber); + await subscriber.subscribe('n8n.commands'); + subscriber.setCommandMessageHandler(); + + Container.get(PubSubHandler).init(); } } diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts new file mode 100644 index 0000000000..c637b2faf9 --- /dev/null +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -0,0 +1,142 @@ +import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; + +import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; +import { EventService } from '@/events/event.service'; +import type { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; +import type { License } from '@/license'; +import type { CommunityPackagesService } from '@/services/community-packages.service'; + +import { PubSubHandler } from '../pubsub/pubsub-handler'; + +describe('PubSubHandler', () => { + const eventService = new EventService(); + const license = mock(); + const eventbus = mock(); + const externalSecretsManager = mock(); + const communityPackagesService = mock(); + + describe('in webhook process', () => { + const instanceSettings = mock({ instanceType: 'webhook' }); + + it('should set up handlers in webhook process', () => { + // @ts-expect-error Spying on private method + const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers'); + + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + expect(setupWebhookHandlersSpy).toHaveBeenCalled(); + }); + + it('should reload license on `reload-license` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + eventService.emit('reload-license'); + + expect(license.reload).toHaveBeenCalled(); + }); + + it('should restart event bus on `restart-event-bus` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + eventService.emit('restart-event-bus'); + + expect(eventbus.restart).toHaveBeenCalled(); + }); + + it('should reload providers on `reload-external-secrets-providers` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + eventService.emit('reload-external-secrets-providers'); + + expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); + }); + + it('should install community package on `community-package-install` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + eventService.emit('community-package-install', { + packageName: 'test-package', + packageVersion: '1.0.0', + }); + + expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( + 'test-package', + '1.0.0', + ); + }); + + it('should update community package on `community-package-update` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + eventService.emit('community-package-update', { + packageName: 'test-package', + packageVersion: '1.0.0', + }); + + expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( + 'test-package', + '1.0.0', + ); + }); + + it('should uninstall community package on `community-package-uninstall` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + ).init(); + + eventService.emit('community-package-uninstall', { + packageName: 'test-package', + }); + + expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); + }); + }); +}); diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts index 14e67a8d4d..31e8486b8c 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -17,14 +17,14 @@ describe('Subscriber', () => { describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const subscriber = new Subscriber(mock(), redisClientService); + const subscriber = new Subscriber(mock(), redisClientService, mock()); expect(subscriber.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const subscriber = new Subscriber(mock(), redisClientService); + const subscriber = new Subscriber(mock(), redisClientService, mock()); expect(subscriber.getClient()).toBeUndefined(); }); @@ -32,7 +32,7 @@ describe('Subscriber', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const subscriber = new Subscriber(mock(), redisClientService); + const subscriber = new Subscriber(mock(), redisClientService, mock()); subscriber.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -40,7 +40,7 @@ describe('Subscriber', () => { describe('subscribe', () => { it('should subscribe to pubsub channel', async () => { - const subscriber = new Subscriber(mock(), redisClientService); + const subscriber = new Subscriber(mock(), redisClientService, mock()); await subscriber.subscribe('n8n.commands'); @@ -50,7 +50,7 @@ describe('Subscriber', () => { describe('setMessageHandler', () => { it('should set message handler function for channel', () => { - const subscriber = new Subscriber(mock(), redisClientService); + const subscriber = new Subscriber(mock(), redisClientService, mock()); const channel = 'n8n.commands'; const handlerFn = jest.fn(); diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts new file mode 100644 index 0000000000..8b7a91e4dd --- /dev/null +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -0,0 +1,61 @@ +import { InstanceSettings } from 'n8n-core'; +import { Service } from 'typedi'; + +import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; +import { EventService } from '@/events/event.service'; +import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map'; +import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; +import { License } from '@/license'; +import { CommunityPackagesService } from '@/services/community-packages.service'; + +/** + * Responsible for handling events emitted from messages received via a pubsub channel. + */ +@Service() +export class PubSubHandler { + constructor( + private readonly eventService: EventService, + private readonly instanceSettings: InstanceSettings, + private readonly license: License, + private readonly eventbus: MessageEventBus, + private readonly externalSecretsManager: ExternalSecretsManager, + private readonly communityPackagesService: CommunityPackagesService, + ) {} + + init() { + if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers(); + } + + private setupHandlers( + map: { + [EventName in EventNames]?: (event: PubSubEventMap[EventName]) => void | Promise; + }, + ) { + for (const [eventName, handlerFn] of Object.entries(map) as Array< + [EventNames, (event: PubSubEventMap[EventNames]) => void | Promise] + >) { + this.eventService.on(eventName, async (event) => { + await handlerFn(event); + }); + } + } + + // #region Webhook process + + private setupWebhookHandlers() { + this.setupHandlers({ + 'reload-license': async () => await this.license.reload(), + 'restart-event-bus': async () => await this.eventbus.restart(), + 'reload-external-secrets-providers': async () => + await this.externalSecretsManager.reloadAllProviders(), + 'community-package-install': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-update': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-uninstall': async ({ packageName }) => + await this.communityPackagesService.removeNpmPackage(packageName), + }); + } + + // #endregion +} diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index fe3b22b479..f9a2567f8d 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -1,7 +1,10 @@ import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; +import debounce from 'lodash/debounce'; +import { jsonParse } from 'n8n-workflow'; import { Service } from 'typedi'; import config from '@/config'; +import { EventService } from '@/events/event.service'; import { Logger } from '@/logging/logger.service'; import { RedisClientService } from '@/services/redis-client.service'; @@ -21,6 +24,7 @@ export class Subscriber { constructor( private readonly logger: Logger, private readonly redisClientService: RedisClientService, + private readonly eventService: EventService, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; @@ -62,4 +66,39 @@ export class Subscriber { } // #endregion + + // #region Commands + + setCommandMessageHandler() { + const handlerFn = debounce((str: string) => { + const msg = this.parseCommandMessage(str); + if (msg) this.eventService.emit(msg.command, msg.payload); + }, 300); + + this.setMessageHandler('n8n.commands', handlerFn); + } + + private parseCommandMessage(str: string) { + const msg = jsonParse(str, { fallbackValue: null }); + + if (!msg) { + this.logger.debug('Received invalid string via command channel', { message: str }); + + return null; + } + + this.logger.debug('Received message via command channel', msg); + + const queueModeId = config.getEnv('redis.queueModeId'); + + if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) { + this.logger.debug('Disregarding message - not for this instance', msg); + + return null; + } + + return msg; + } + + // #endregion } diff --git a/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts b/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts deleted file mode 100644 index 3555139a99..0000000000 --- a/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { InstanceSettings } from 'n8n-core'; -import Container from 'typedi'; -import { Logger } from 'winston'; - -import config from '@/config'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; -import { License } from '@/license'; -import { CommunityPackagesService } from '@/services/community-packages.service'; - -import { messageToRedisServiceCommandObject, debounceMessageReceiver } from '../helpers'; - -export async function handleCommandMessageWebhook(messageString: string) { - const queueModeId = config.getEnv('redis.queueModeId'); - const isMainInstance = Container.get(InstanceSettings).instanceType === 'main'; - const message = messageToRedisServiceCommandObject(messageString); - const logger = Container.get(Logger); - - if (message) { - logger.debug( - `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, - ); - - if ( - message.senderId === queueModeId || - (message.targets && !message.targets.includes(queueModeId)) - ) { - // Skipping command message because it's not for this instance - logger.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - - switch (message.command) { - case 'reload-license': - if (!debounceMessageReceiver(message, 500)) { - return { ...message, payload: { result: 'debounced' } }; - } - - if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { - // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently - logger.error( - 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', - ); - return message; - } - await Container.get(License).reload(); - break; - case 'restart-event-bus': - if (!debounceMessageReceiver(message, 200)) { - return { ...message, payload: { result: 'debounced' } }; - } - await Container.get(MessageEventBus).restart(); - case 'reload-external-secrets-providers': - if (!debounceMessageReceiver(message, 200)) { - return { ...message, payload: { result: 'debounced' } }; - } - await Container.get(ExternalSecretsManager).reloadAllProviders(); - break; - case 'community-package-install': - case 'community-package-update': - case 'community-package-uninstall': - if (!debounceMessageReceiver(message, 200)) { - return message; - } - const { packageName } = message.payload; - const communityPackagesService = Container.get(CommunityPackagesService); - if (message.command === 'community-package-uninstall') { - await communityPackagesService.removeNpmPackage(packageName); - } else { - await communityPackagesService.installOrUpdateNpmPackage( - packageName, - message.payload.packageVersion, - ); - } - break; - - default: - break; - } - - return message; - } - - return; -} diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts deleted file mode 100644 index de7bded68e..0000000000 --- a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { Service } from 'typedi'; - -import { Subscriber } from '@/scaling/pubsub/subscriber.service'; - -import { handleCommandMessageWebhook } from './handle-command-message-webhook'; -import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; - -@Service() -export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService { - constructor(private readonly subscriber: Subscriber) { - super(); - } - - async initSubscriber() { - await this.subscriber.subscribe('n8n.commands'); - - this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageWebhook); - } -}