From 383b4765d23c10f42166a38de630d71f449f6d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 7 Oct 2024 16:19:58 +0200 Subject: [PATCH] refactor(core): Simplify worker pubsub message handler (#11086) --- packages/cli/src/commands/webhook.ts | 3 +- packages/cli/src/commands/worker.ts | 15 +- .../scaling/__tests__/pubsub-handler.test.ts | 152 ++++++++++++++++- .../cli/src/scaling/pubsub/pubsub-handler.ts | 75 ++++++--- packages/cli/src/scaling/worker-status.ts | 43 +++++ .../worker/handle-command-message-worker.ts | 153 ------------------ .../orchestration.handler.worker.service.ts | 22 --- .../integration/commands/worker.cmd.test.ts | 5 +- 8 files changed, 259 insertions(+), 209 deletions(-) create mode 100644 packages/cli/src/scaling/worker-status.ts delete mode 100644 packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts delete mode 100644 packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index a0f9e10f80..5a5d656c8c 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -112,10 +112,9 @@ export class Webhook extends BaseCommand { async initOrchestration() { await Container.get(OrchestrationWebhookService).init(); + Container.get(PubSubHandler).init(); const subscriber = Container.get(Subscriber); await subscriber.subscribe('n8n.commands'); subscriber.setCommandMessageHandler(); - - Container.get(PubSubHandler).init(); } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 8c1aabf74a..6345db6763 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { JobProcessor } from '@/scaling/job-processor'; -import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; -import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { BaseCommand } from './base-command'; @@ -128,12 +128,11 @@ export class Worker extends BaseCommand { */ async initOrchestration() { await Container.get(OrchestrationWorkerService).init(); - await Container.get(OrchestrationHandlerWorkerService).initWithOptions({ - queueModeId: this.queueModeId, - publisher: Container.get(Publisher), - getRunningJobIds: () => this.jobProcessor.getRunningJobIds(), - getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(), - }); + + Container.get(PubSubHandler).init(); + const subscriber = Container.get(Subscriber); + await subscriber.subscribe('n8n.commands'); + subscriber.setCommandMessageHandler(); } async setConcurrency() { diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index c637b2faf9..0cf8d5ef48 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -7,7 +7,9 @@ import type { ExternalSecretsManager } from '@/external-secrets/external-secrets import type { License } from '@/license'; import type { CommunityPackagesService } from '@/services/community-packages.service'; +import type { Publisher } from '../pubsub/publisher.service'; import { PubSubHandler } from '../pubsub/pubsub-handler'; +import type { WorkerStatus } from '../worker-status'; describe('PubSubHandler', () => { const eventService = new EventService(); @@ -15,13 +17,19 @@ describe('PubSubHandler', () => { const eventbus = mock(); const externalSecretsManager = mock(); const communityPackagesService = mock(); + const publisher = mock(); + const workerStatus = mock(); + + afterEach(() => { + eventService.removeAllListeners(); + }); describe('in webhook process', () => { const instanceSettings = mock({ instanceType: 'webhook' }); it('should set up handlers in webhook process', () => { // @ts-expect-error Spying on private method - const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers'); + const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); new PubSubHandler( eventService, @@ -30,9 +38,18 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); - expect(setupWebhookHandlersSpy).toHaveBeenCalled(); + expect(setupHandlersSpy).toHaveBeenCalledWith({ + 'reload-license': expect.any(Function), + 'restart-event-bus': expect.any(Function), + 'reload-external-secrets-providers': expect.any(Function), + 'community-package-install': expect.any(Function), + 'community-package-update': expect.any(Function), + 'community-package-uninstall': expect.any(Function), + }); }); it('should reload license on `reload-license` event', () => { @@ -43,6 +60,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('reload-license'); @@ -58,6 +77,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('restart-event-bus'); @@ -73,6 +94,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('reload-external-secrets-providers'); @@ -88,6 +111,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('community-package-install', { @@ -109,6 +134,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('community-package-update', { @@ -130,6 +157,8 @@ describe('PubSubHandler', () => { eventbus, externalSecretsManager, communityPackagesService, + publisher, + workerStatus, ).init(); eventService.emit('community-package-uninstall', { @@ -139,4 +168,123 @@ describe('PubSubHandler', () => { expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); }); }); + + describe('in worker process', () => { + const instanceSettings = mock({ instanceType: 'worker' }); + + it('should set up handlers in worker process', () => { + // @ts-expect-error Spying on private method + const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); + + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + expect(setupHandlersSpy).toHaveBeenCalledWith({ + 'reload-license': expect.any(Function), + 'restart-event-bus': expect.any(Function), + 'reload-external-secrets-providers': expect.any(Function), + 'community-package-install': expect.any(Function), + 'community-package-update': expect.any(Function), + 'community-package-uninstall': expect.any(Function), + 'get-worker-status': expect.any(Function), + 'get-worker-id': expect.any(Function), + }); + }); + + it('should reload license on `reload-license` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('reload-license'); + + expect(license.reload).toHaveBeenCalled(); + }); + + it('should restart event bus on `restart-event-bus` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('restart-event-bus'); + + expect(eventbus.restart).toHaveBeenCalled(); + }); + + it('should reload providers on `reload-external-secrets-providers` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('reload-external-secrets-providers'); + + expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); + }); + + it('should generate status on `get-worker-status` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('get-worker-status'); + + expect(workerStatus.generateStatus).toHaveBeenCalled(); + }); + + it('should get worker ID on `get-worker-id` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatus, + ).init(); + + eventService.emit('get-worker-id'); + + expect(publisher.publishWorkerResponse).toHaveBeenCalledWith({ + workerId: expect.any(String), + command: 'get-worker-id', + }); + }); + }); }); diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 8b7a91e4dd..267cae6977 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,12 +1,17 @@ import { InstanceSettings } from 'n8n-core'; import { Service } from 'typedi'; +import config from '@/config'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { License } from '@/license'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { CommunityPackagesService } from '@/services/community-packages.service'; +import { assertNever } from '@/utils'; + +import { WorkerStatus } from '../worker-status'; /** * Responsible for handling events emitted from messages received via a pubsub channel. @@ -20,10 +25,37 @@ export class PubSubHandler { private readonly eventbus: MessageEventBus, private readonly externalSecretsManager: ExternalSecretsManager, private readonly communityPackagesService: CommunityPackagesService, + private readonly publisher: Publisher, + private readonly workerStatus: WorkerStatus, ) {} init() { - if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers(); + switch (this.instanceSettings.instanceType) { + case 'webhook': + this.setupHandlers(this.commonHandlers); + break; + case 'worker': + this.setupHandlers({ + ...this.commonHandlers, + 'get-worker-status': async () => + await this.publisher.publishWorkerResponse({ + workerId: config.getEnv('redis.queueModeId'), + command: 'get-worker-status', + payload: this.workerStatus.generateStatus(), + }), + 'get-worker-id': async () => + await this.publisher.publishWorkerResponse({ + workerId: config.getEnv('redis.queueModeId'), + command: 'get-worker-id', + }), + }); + break; + case 'main': + // TODO + break; + default: + assertNever(this.instanceSettings.instanceType); + } } private setupHandlers( @@ -40,22 +72,27 @@ export class PubSubHandler { } } - // #region Webhook process - - private setupWebhookHandlers() { - this.setupHandlers({ - 'reload-license': async () => await this.license.reload(), - 'restart-event-bus': async () => await this.eventbus.restart(), - 'reload-external-secrets-providers': async () => - await this.externalSecretsManager.reloadAllProviders(), - 'community-package-install': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-update': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-uninstall': async ({ packageName }) => - await this.communityPackagesService.removeNpmPackage(packageName), - }); - } - - // #endregion + /** Handlers shared by webhook and worker processes. */ + private commonHandlers: { + [K in keyof Pick< + PubSubEventMap, + | 'reload-license' + | 'restart-event-bus' + | 'reload-external-secrets-providers' + | 'community-package-install' + | 'community-package-update' + | 'community-package-uninstall' + >]: (event: PubSubEventMap[K]) => Promise; + } = { + 'reload-license': async () => await this.license.reload(), + 'restart-event-bus': async () => await this.eventbus.restart(), + 'reload-external-secrets-providers': async () => + await this.externalSecretsManager.reloadAllProviders(), + 'community-package-install': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-update': async ({ packageName, packageVersion }) => + await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), + 'community-package-uninstall': async ({ packageName }) => + await this.communityPackagesService.removeNpmPackage(packageName), + }; } diff --git a/packages/cli/src/scaling/worker-status.ts b/packages/cli/src/scaling/worker-status.ts new file mode 100644 index 0000000000..cddccc7e1f --- /dev/null +++ b/packages/cli/src/scaling/worker-status.ts @@ -0,0 +1,43 @@ +import os from 'node:os'; +import { Service } from 'typedi'; + +import config from '@/config'; +import { N8N_VERSION } from '@/constants'; + +import { JobProcessor } from './job-processor'; + +@Service() +export class WorkerStatus { + constructor(private readonly jobProcessor: JobProcessor) {} + + generateStatus() { + return { + workerId: config.getEnv('redis.queueModeId'), + runningJobsSummary: this.jobProcessor.getRunningJobsSummary(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: this.getOsCpuString(), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) => + (interfaces ?? [])?.map((net) => ({ + family: net.family, + address: net.address, + internal: net.internal, + })), + ), + version: N8N_VERSION, + }; + } + + private getOsCpuString() { + const cpus = os.cpus(); + + if (cpus.length === 0) return 'no CPU info'; + + return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`; + } +} diff --git a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts deleted file mode 100644 index ae11ac96fe..0000000000 --- a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { jsonParse } from 'n8n-workflow'; -import os from 'node:os'; -import Container from 'typedi'; - -import { N8N_VERSION } from '@/constants'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; -import { License } from '@/license'; -import { Logger } from '@/logging/logger.service'; -import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; -import type { PubSub } from '@/scaling/pubsub/pubsub.types'; -import { CommunityPackagesService } from '@/services/community-packages.service'; - -import type { WorkerCommandReceivedHandlerOptions } from './types'; -import { debounceMessageReceiver, getOsCpuString } from '../helpers'; - -// eslint-disable-next-line complexity -export async function getWorkerCommandReceivedHandler( - messageString: string, - options: WorkerCommandReceivedHandlerOptions, -) { - if (!messageString) return; - - const logger = Container.get(Logger); - let message: PubSub.Command; - try { - message = jsonParse(messageString); - } catch { - logger.debug( - `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - logger.debug( - `RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`, - ); - if (message.targets && !message.targets.includes(options.queueModeId)) { - return; // early return if the message is not for this worker - } - switch (message.command) { - case 'get-worker-status': - if (!debounceMessageReceiver(message, 500)) return; - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'get-worker-status', - payload: { - workerId: options.queueModeId, - runningJobsSummary: options.getRunningJobsSummary(), - freeMem: os.freemem(), - totalMem: os.totalmem(), - uptime: process.uptime(), - loadAvg: os.loadavg(), - cpus: getOsCpuString(), - arch: os.arch(), - platform: os.platform(), - hostname: os.hostname(), - interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) => - (interfaces ?? [])?.map((net) => ({ - family: net.family, - address: net.address, - internal: net.internal, - })), - ), - version: N8N_VERSION, - }, - }); - break; - case 'get-worker-id': - if (!debounceMessageReceiver(message, 500)) return; - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'get-worker-id', - }); - break; - case 'restart-event-bus': - if (!debounceMessageReceiver(message, 500)) return; - try { - await Container.get(MessageEventBus).restart(); - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'restart-event-bus', - payload: { - result: 'success', - }, - }); - } catch (error) { - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'restart-event-bus', - payload: { - result: 'error', - error: (error as Error).message, - }, - }); - } - break; - case 'reload-external-secrets-providers': - if (!debounceMessageReceiver(message, 500)) return; - try { - await Container.get(ExternalSecretsManager).reloadAllProviders(); - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'reload-external-secrets-providers', - payload: { - result: 'success', - }, - }); - } catch (error) { - await options.publisher.publishWorkerResponse({ - workerId: options.queueModeId, - command: 'reload-external-secrets-providers', - payload: { - result: 'error', - error: (error as Error).message, - }, - }); - } - break; - case 'community-package-install': - case 'community-package-update': - case 'community-package-uninstall': - if (!debounceMessageReceiver(message, 500)) return; - const { packageName } = message.payload; - const communityPackagesService = Container.get(CommunityPackagesService); - if (message.command === 'community-package-uninstall') { - await communityPackagesService.removeNpmPackage(packageName); - } else { - await communityPackagesService.installOrUpdateNpmPackage( - packageName, - message.payload.packageVersion, - ); - } - break; - case 'reload-license': - if (!debounceMessageReceiver(message, 500)) return; - await Container.get(License).reload(); - break; - default: - if ( - message.command === 'relay-execution-lifecycle-event' || - message.command === 'clear-test-webhooks' - ) { - break; // meant only for main - } - - logger.debug( - `Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`, - ); - break; - } - } -} diff --git a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts deleted file mode 100644 index 06113d7344..0000000000 --- a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Service } from 'typedi'; - -import { Subscriber } from '@/scaling/pubsub/subscriber.service'; - -import { getWorkerCommandReceivedHandler } from './handle-command-message-worker'; -import type { WorkerCommandReceivedHandlerOptions } from './types'; -import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; - -@Service() -export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService { - constructor(private readonly subscriber: Subscriber) { - super(); - } - - async initSubscriber(options: WorkerCommandReceivedHandlerOptions) { - await this.subscriber.subscribe('n8n.commands'); - - this.subscriber.setMessageHandler('n8n.commands', async (message: string) => { - await getWorkerCommandReceivedHandler(message, options); - }); - } -} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 1ff05181b2..585d64cfb4 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -11,8 +11,8 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; -import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { setupTestCommand } from '@test-integration/utils/test-command'; @@ -27,10 +27,10 @@ const externalSecretsManager = mockInstance(ExternalSecretsManager); const license = mockInstance(License, { loadCertStr: async () => '' }); const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); -const orchestrationHandlerWorkerService = mockInstance(OrchestrationHandlerWorkerService); const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); mockInstance(Publisher); +mockInstance(Subscriber); const command = setupTestCommand(Worker); @@ -48,6 +48,5 @@ test('worker initializes all its components', async () => { expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); - expect(orchestrationHandlerWorkerService.initWithOptions).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); });