diff --git a/packages/@n8n/api-types/src/scaling.ts b/packages/@n8n/api-types/src/scaling.ts index 8e15f06804..f0c3627e84 100644 --- a/packages/@n8n/api-types/src/scaling.ts +++ b/packages/@n8n/api-types/src/scaling.ts @@ -11,7 +11,7 @@ export type RunningJobSummary = { }; export type WorkerStatus = { - workerId: string; + senderId: string; runningJobsSummary: RunningJobSummary[]; freeMem: number; totalMem: number; diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 070ba4f0cb..428f451fdc 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -23,9 +23,9 @@ import { ExecutionService } from '@/executions/execution.service'; import { License } from '@/license'; import { SingleMainTaskManager } from '@/runners/task-managers/single-main-task-manager'; import { TaskManager } from '@/runners/task-managers/task-manager'; -import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Server } from '@/server'; -import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { OrchestrationService } from '@/services/orchestration.service'; import { OwnershipService } from '@/services/ownership.service'; import { PruningService } from '@/services/pruning.service'; @@ -254,10 +254,11 @@ export class Start extends BaseCommand { await orchestrationService.init(); - await Container.get(OrchestrationHandlerMainService).initWithOptions({ - queueModeId: this.queueModeId, - publisher: Container.get(Publisher), - }); + Container.get(PubSubHandler).init(); + + const subscriber = Container.get(Subscriber); + await subscriber.subscribe('n8n.commands'); + await subscriber.subscribe('n8n.worker-response'); if (!orchestrationService.isMultiMainSetupEnabled) return; diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index d88fd3f5d4..8c601c7ebc 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -115,8 +115,6 @@ export class Webhook extends BaseCommand { await Container.get(OrchestrationWebhookService).init(); Container.get(PubSubHandler).init(); - const subscriber = Container.get(Subscriber); - await subscriber.subscribe('n8n.commands'); - subscriber.setCommandMessageHandler(); + await Container.get(Subscriber).subscribe('n8n.commands'); } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 546a6dcc77..528951be4a 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -132,9 +132,7 @@ export class Worker extends BaseCommand { await Container.get(OrchestrationWorkerService).init(); Container.get(PubSubHandler).init(); - const subscriber = Container.get(Subscriber); - await subscriber.subscribe('n8n.commands'); - subscriber.setCommandMessageHandler(); + await Container.get(Subscriber).subscribe('n8n.commands'); } async setConcurrency() { diff --git a/packages/cli/src/events/maps/pub-sub.event-map.ts b/packages/cli/src/events/maps/pub-sub.event-map.ts index a1134470bb..ff27741b9b 100644 --- a/packages/cli/src/events/maps/pub-sub.event-map.ts +++ b/packages/cli/src/events/maps/pub-sub.event-map.ts @@ -1,4 +1,4 @@ -import type { WorkerStatus, PushType } from '@n8n/api-types'; +import type { PushType, WorkerStatus } from '@n8n/api-types'; import type { IWorkflowDb } from '@/interfaces'; @@ -80,5 +80,5 @@ export type PubSubCommandMap = { }; export type PubSubWorkerResponseMap = { - 'get-worker-status': WorkerStatus; + 'response-to-get-worker-status': WorkerStatus; }; diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index 3386a5c648..05bb52bc6a 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -61,7 +61,7 @@ describe('Publisher', () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { const publisher = new Publisher(mock(), redisClientService); const msg = mock({ - command: 'get-worker-status', + response: 'response-to-get-worker-status', }); await publisher.publishWorkerResponse(msg); diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index fdb7c8dd23..314ded0b8b 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -1,15 +1,25 @@ +import type { WorkerStatus } from '@n8n/api-types'; import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; +import type { Workflow } from 'n8n-workflow'; +import type { ActiveWorkflowManager } from '@/active-workflow-manager'; +import type { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; import type { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; +import type { IWorkflowDb } from '@/interfaces'; import type { License } from '@/license'; +import type { Push } from '@/push'; +import type { WebSocketPush } from '@/push/websocket.push'; import type { CommunityPackagesService } from '@/services/community-packages.service'; +import type { TestWebhooks } from '@/webhooks/test-webhooks'; import type { Publisher } from '../pubsub/publisher.service'; import { PubSubHandler } from '../pubsub/pubsub-handler'; -import type { WorkerStatus } from '../worker-status'; +import type { WorkerStatusService } from '../worker-status.service'; + +const flushPromises = async () => await new Promise((resolve) => setImmediate(resolve)); describe('PubSubHandler', () => { const eventService = new EventService(); @@ -18,7 +28,11 @@ describe('PubSubHandler', () => { const externalSecretsManager = mock(); const communityPackagesService = mock(); const publisher = mock(); - const workerStatus = mock(); + const workerStatusService = mock(); + const activeWorkflowManager = mock(); + const push = mock(); + const workflowRepository = mock(); + const testWebhooks = mock(); afterEach(() => { eventService.removeAllListeners(); @@ -29,7 +43,7 @@ describe('PubSubHandler', () => { it('should set up handlers in webhook process', () => { // @ts-expect-error Spying on private method - const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); + const setupHandlers = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); new PubSubHandler( eventService, @@ -39,10 +53,14 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); - expect(setupHandlersSpy).toHaveBeenCalledWith({ + expect(setupHandlers).toHaveBeenCalledWith({ 'reload-license': expect.any(Function), 'restart-event-bus': expect.any(Function), 'reload-external-secrets-providers': expect.any(Function), @@ -61,7 +79,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('reload-license'); @@ -78,7 +100,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('restart-event-bus'); @@ -95,7 +121,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('reload-external-secrets-providers'); @@ -112,7 +142,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('community-package-install', { @@ -135,7 +169,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('community-package-update', { @@ -158,7 +196,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('community-package-uninstall', { @@ -184,7 +226,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); expect(setupHandlersSpy).toHaveBeenCalledWith({ @@ -207,7 +253,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('reload-license'); @@ -224,7 +274,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('restart-event-bus'); @@ -241,7 +295,11 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('reload-external-secrets-providers'); @@ -249,6 +307,83 @@ describe('PubSubHandler', () => { expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); }); + it('should install community package on `community-package-install` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('community-package-install', { + packageName: 'test-package', + packageVersion: '1.0.0', + }); + + expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( + 'test-package', + '1.0.0', + ); + }); + + it('should update community package on `community-package-update` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('community-package-update', { + packageName: 'test-package', + packageVersion: '1.0.0', + }); + + expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( + 'test-package', + '1.0.0', + ); + }); + + it('should uninstall community package on `community-package-uninstall` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('community-package-uninstall', { + packageName: 'test-package', + }); + + expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); + }); + it('should generate status on `get-worker-status` event', () => { new PubSubHandler( eventService, @@ -258,12 +393,486 @@ describe('PubSubHandler', () => { externalSecretsManager, communityPackagesService, publisher, - workerStatus, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, ).init(); eventService.emit('get-worker-status'); - expect(workerStatus.generateStatus).toHaveBeenCalled(); + expect(workerStatusService.generateStatus).toHaveBeenCalled(); + }); + }); + + describe('in main process', () => { + const instanceSettings = mock({ + instanceType: 'main', + isLeader: true, + isFollower: false, + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should set up command and worker response handlers in main process', () => { + // @ts-expect-error Spying on private method + const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); + + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + expect(setupHandlersSpy).toHaveBeenCalledWith({ + 'reload-license': expect.any(Function), + 'restart-event-bus': expect.any(Function), + 'reload-external-secrets-providers': expect.any(Function), + 'community-package-install': expect.any(Function), + 'community-package-update': expect.any(Function), + 'community-package-uninstall': expect.any(Function), + 'add-webhooks-triggers-and-pollers': expect.any(Function), + 'remove-triggers-and-pollers': expect.any(Function), + 'display-workflow-activation': expect.any(Function), + 'display-workflow-deactivation': expect.any(Function), + 'display-workflow-activation-error': expect.any(Function), + 'relay-execution-lifecycle-event': expect.any(Function), + 'clear-test-webhooks': expect.any(Function), + 'response-to-get-worker-status': expect.any(Function), + }); + }); + + it('should reload license on `reload-license` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('reload-license'); + + expect(license.reload).toHaveBeenCalled(); + }); + + it('should restart event bus on `restart-event-bus` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('restart-event-bus'); + + expect(eventbus.restart).toHaveBeenCalled(); + }); + + it('should reload providers on `reload-external-secrets-providers` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('reload-external-secrets-providers'); + + expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); + }); + + it('should install community package on `community-package-install` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('community-package-install', { + packageName: 'test-package', + packageVersion: '1.0.0', + }); + + expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( + 'test-package', + '1.0.0', + ); + }); + + it('should update community package on `community-package-update` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('community-package-update', { + packageName: 'test-package', + packageVersion: '1.0.0', + }); + + expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( + 'test-package', + '1.0.0', + ); + }); + + it('should uninstall community package on `community-package-uninstall` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + eventService.emit('community-package-uninstall', { + packageName: 'test-package', + }); + + expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); + }); + + describe('multi-main setup', () => { + it('if leader, should handle `add-webhooks-triggers-and-pollers` event', async () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + + eventService.emit('add-webhooks-triggers-and-pollers', { workflowId }); + + await flushPromises(); + + expect(activeWorkflowManager.add).toHaveBeenCalledWith(workflowId, 'activate', undefined, { + shouldPublish: false, + }); + expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId }); + expect(publisher.publishCommand).toHaveBeenCalledWith({ + command: 'display-workflow-activation', + payload: { workflowId }, + }); + }); + + it('if follower, should skip `add-webhooks-triggers-and-pollers` event', async () => { + new PubSubHandler( + eventService, + mock({ instanceType: 'main', isLeader: false, isFollower: true }), + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + + eventService.emit('add-webhooks-triggers-and-pollers', { workflowId }); + + await flushPromises(); + + expect(activeWorkflowManager.add).not.toHaveBeenCalled(); + expect(push.broadcast).not.toHaveBeenCalled(); + expect(publisher.publishCommand).not.toHaveBeenCalled(); + }); + + it('if leader, should handle `remove-triggers-and-pollers` event', async () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + + eventService.emit('remove-triggers-and-pollers', { workflowId }); + + await flushPromises(); + + expect(activeWorkflowManager.removeActivationError).toHaveBeenCalledWith(workflowId); + expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).toHaveBeenCalledWith( + workflowId, + ); + expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId }); + expect(publisher.publishCommand).toHaveBeenCalledWith({ + command: 'display-workflow-deactivation', + payload: { workflowId }, + }); + }); + + it('if follower, should skip `remove-triggers-and-pollers` event', async () => { + new PubSubHandler( + eventService, + mock({ instanceType: 'main', isLeader: false, isFollower: true }), + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + + eventService.emit('remove-triggers-and-pollers', { workflowId }); + + await flushPromises(); + + expect(activeWorkflowManager.removeActivationError).not.toHaveBeenCalled(); + expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).not.toHaveBeenCalled(); + expect(push.broadcast).not.toHaveBeenCalled(); + expect(publisher.publishCommand).not.toHaveBeenCalled(); + }); + + it('should handle `display-workflow-activation` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + + eventService.emit('display-workflow-activation', { workflowId }); + + expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId }); + }); + + it('should handle `display-workflow-deactivation` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + + eventService.emit('display-workflow-deactivation', { workflowId }); + + expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId }); + }); + + it('should handle `display-workflow-activation-error` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workflowId = 'test-workflow-id'; + const errorMessage = 'Test error message'; + + eventService.emit('display-workflow-activation-error', { workflowId, errorMessage }); + + expect(push.broadcast).toHaveBeenCalledWith('workflowFailedToActivate', { + workflowId, + errorMessage, + }); + }); + + it('should handle `relay-execution-lifecycle-event` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const pushRef = 'test-push-ref'; + const type = 'executionStarted'; + const args = { testArg: 'value' }; + + push.getBackend.mockReturnValue( + mock({ hasPushRef: jest.fn().mockReturnValue(true) }), + ); + + eventService.emit('relay-execution-lifecycle-event', { type, args, pushRef }); + + expect(push.send).toHaveBeenCalledWith(type, args, pushRef); + }); + + it('should handle `clear-test-webhooks` event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const webhookKey = 'test-webhook-key'; + const workflowEntity = mock({ id: 'test-workflow-id' }); + const pushRef = 'test-push-ref'; + + push.getBackend.mockReturnValue( + mock({ hasPushRef: jest.fn().mockReturnValue(true) }), + ); + testWebhooks.toWorkflow.mockReturnValue(mock({ id: 'test-workflow-id' })); + + eventService.emit('clear-test-webhooks', { webhookKey, workflowEntity, pushRef }); + + expect(testWebhooks.clearTimeout).toHaveBeenCalledWith(webhookKey); + expect(testWebhooks.deactivateWebhooks).toHaveBeenCalled(); + }); + + it('should handle `response-to-get-worker-status event', () => { + new PubSubHandler( + eventService, + instanceSettings, + license, + eventbus, + externalSecretsManager, + communityPackagesService, + publisher, + workerStatusService, + activeWorkflowManager, + push, + workflowRepository, + testWebhooks, + ).init(); + + const workerStatus = mock({ senderId: 'worker-1', loadAvg: [123] }); + + eventService.emit('response-to-get-worker-status', workerStatus); + + expect(push.broadcast).toHaveBeenCalledWith('sendWorkerStatusMessage', { + workerId: workerStatus.senderId, + status: workerStatus, + }); + }); }); }); }); diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts index 31e8486b8c..62834dba33 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -47,17 +47,4 @@ describe('Subscriber', () => { expect(client.subscribe).toHaveBeenCalledWith('n8n.commands', expect.any(Function)); }); }); - - describe('setMessageHandler', () => { - it('should set message handler function for channel', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); - const channel = 'n8n.commands'; - const handlerFn = jest.fn(); - - subscriber.setMessageHandler(channel, handlerFn); - - // @ts-expect-error Private field - expect(subscriber.handlers).toEqual(new Map([[channel, handlerFn]])); - }); - }); }); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index bfcede6542..29d31989ff 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -59,7 +59,7 @@ export class Publisher { async publishWorkerResponse(msg: PubSub.WorkerResponse) { await this.client.publish('n8n.worker-response', JSON.stringify(msg)); - this.logger.debug(`Published response for ${msg.command} to worker response channel`); + this.logger.debug(`Published response ${msg.response} to worker response channel`); } // #endregion diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 215549e1b9..ca590dd2c2 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,17 +1,23 @@ import { InstanceSettings } from 'n8n-core'; +import { ensureError } from 'n8n-workflow'; import { Service } from 'typedi'; +import { ActiveWorkflowManager } from '@/active-workflow-manager'; import config from '@/config'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { License } from '@/license'; +import { Push } from '@/push'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { CommunityPackagesService } from '@/services/community-packages.service'; import { assertNever } from '@/utils'; +import { TestWebhooks } from '@/webhooks/test-webhooks'; -import { WorkerStatus } from '../worker-status'; +import type { PubSub } from './pubsub.types'; +import { WorkerStatusService } from '../worker-status.service'; /** * Responsible for handling events emitted from messages received via a pubsub channel. @@ -26,7 +32,11 @@ export class PubSubHandler { private readonly externalSecretsManager: ExternalSecretsManager, private readonly communityPackagesService: CommunityPackagesService, private readonly publisher: Publisher, - private readonly workerStatus: WorkerStatus, + private readonly workerStatusService: WorkerStatusService, + private readonly activeWorkflowManager: ActiveWorkflowManager, + private readonly push: Push, + private readonly workflowRepository: WorkflowRepository, + private readonly testWebhooks: TestWebhooks, ) {} init() { @@ -39,14 +49,23 @@ export class PubSubHandler { ...this.commonHandlers, 'get-worker-status': async () => await this.publisher.publishWorkerResponse({ - workerId: config.getEnv('redis.queueModeId'), - command: 'get-worker-status', - payload: this.workerStatus.generateStatus(), + senderId: config.getEnv('redis.queueModeId'), + response: 'response-to-get-worker-status', + payload: this.workerStatusService.generateStatus(), }), }); break; case 'main': - // TODO + this.setupHandlers({ + ...this.commonHandlers, + ...this.multiMainHandlers, + 'response-to-get-worker-status': async (payload) => + this.push.broadcast('sendWorkerStatusMessage', { + workerId: payload.senderId, + status: payload, + }), + }); + break; default: assertNever(this.instanceSettings.instanceType); @@ -67,17 +86,8 @@ export class PubSubHandler { } } - /** Handlers shared by webhook and worker processes. */ private commonHandlers: { - [K in keyof Pick< - PubSubEventMap, - | 'reload-license' - | 'restart-event-bus' - | 'reload-external-secrets-providers' - | 'community-package-install' - | 'community-package-update' - | 'community-package-uninstall' - >]: (event: PubSubEventMap[K]) => Promise; + [EventName in keyof PubSub.CommonEvents]: (event: PubSubEventMap[EventName]) => Promise; } = { 'reload-license': async () => await this.license.reload(), 'restart-event-bus': async () => await this.eventbus.restart(), @@ -90,4 +100,73 @@ export class PubSubHandler { 'community-package-uninstall': async ({ packageName }) => await this.communityPackagesService.removeNpmPackage(packageName), }; + + private multiMainHandlers: { + [EventName in keyof PubSub.MultiMainEvents]: ( + event: PubSubEventMap[EventName], + ) => Promise; + } = { + 'add-webhooks-triggers-and-pollers': async ({ workflowId }) => { + if (this.instanceSettings.isFollower) return; + + try { + await this.activeWorkflowManager.add(workflowId, 'activate', undefined, { + shouldPublish: false, // prevent leader from re-publishing message + }); + + this.push.broadcast('workflowActivated', { workflowId }); + + await this.publisher.publishCommand({ + command: 'display-workflow-activation', + payload: { workflowId }, + }); // instruct followers to show activation in UI + } catch (e) { + const error = ensureError(e); + const { message } = error; + + await this.workflowRepository.update(workflowId, { active: false }); + + this.push.broadcast('workflowFailedToActivate', { workflowId, errorMessage: message }); + + await this.publisher.publishCommand({ + command: 'display-workflow-activation-error', + payload: { workflowId, errorMessage: message }, + }); // instruct followers to show activation error in UI + } + }, + 'remove-triggers-and-pollers': async ({ workflowId }) => { + if (this.instanceSettings.isFollower) return; + + await this.activeWorkflowManager.removeActivationError(workflowId); + await this.activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId); + + this.push.broadcast('workflowDeactivated', { workflowId }); + + // instruct followers to show workflow deactivation in UI + await this.publisher.publishCommand({ + command: 'display-workflow-deactivation', + payload: { workflowId }, + }); + }, + 'display-workflow-activation': async ({ workflowId }) => + this.push.broadcast('workflowActivated', { workflowId }), + 'display-workflow-deactivation': async ({ workflowId }) => + this.push.broadcast('workflowDeactivated', { workflowId }), + 'display-workflow-activation-error': async ({ workflowId, errorMessage }) => + this.push.broadcast('workflowFailedToActivate', { workflowId, errorMessage }), + 'relay-execution-lifecycle-event': async ({ type, args, pushRef }) => { + if (!this.push.getBackend().hasPushRef(pushRef)) return; + + this.push.send(type, args, pushRef); + }, + 'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => { + if (!this.push.getBackend().hasPushRef(pushRef)) return; + + this.testWebhooks.clearTimeout(webhookKey); + + const workflow = this.testWebhooks.toWorkflow(workflowEntity); + + await this.testWebhooks.deactivateWebhooks(workflow); + }, + }; } diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index a356503348..b4d6e1a962 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -1,4 +1,8 @@ -import type { PubSubCommandMap, PubSubWorkerResponseMap } from '@/events/maps/pub-sub.event-map'; +import type { + PubSubCommandMap, + PubSubEventMap, + PubSubWorkerResponseMap, +} from '@/events/maps/pub-sub.event-map'; import type { Resolve } from '@/utlity.types'; import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants'; @@ -75,9 +79,17 @@ export namespace PubSub { // ---------------------------------- type _ToWorkerResponse = { - workerId: string; + /** ID of worker sending the response. */ + senderId: string; + + /** IDs of processes to send the response to. */ targets?: string[]; - command: WorkerResponseKey; + + /** Content of worker response. */ + response: WorkerResponseKey; + + /** Whether the command should be debounced when received. */ + debounce?: boolean; } & (PubSubWorkerResponseMap[WorkerResponseKey] extends never ? { payload?: never } // some responses carry no payload : { payload: PubSubWorkerResponseMap[WorkerResponseKey] }); @@ -87,5 +99,31 @@ export namespace PubSub { >; /** Response sent via the `n8n.worker-response` pubsub channel. */ - export type WorkerResponse = ToWorkerResponse<'get-worker-status'>; + export type WorkerResponse = ToWorkerResponse<'response-to-get-worker-status'>; + + /** + * Of all events emitted from pubsub messages, those whose handlers + * are all present in main, worker, and webhook processes. + */ + export type CommonEvents = Pick< + PubSubEventMap, + | 'reload-license' + | 'restart-event-bus' + | 'reload-external-secrets-providers' + | 'community-package-install' + | 'community-package-update' + | 'community-package-uninstall' + >; + + /** Multi-main events emitted from pubsub messages. */ + export type MultiMainEvents = Pick< + PubSubEventMap, + | 'add-webhooks-triggers-and-pollers' + | 'remove-triggers-and-pollers' + | 'display-workflow-activation' + | 'display-workflow-deactivation' + | 'display-workflow-activation-error' + | 'relay-execution-lifecycle-event' + | 'clear-test-webhooks' + >; } diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 7586b52ebc..7c7f90fb0e 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -17,8 +17,6 @@ import type { PubSub } from './pubsub.types'; export class Subscriber { private readonly client: SingleNodeClient | MultiNodeClient; - private readonly handlers: Map = new Map(); - // #region Lifecycle constructor( @@ -31,8 +29,18 @@ export class Subscriber { this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); - this.client.on('message', (channel: PubSub.Channel, message) => { - this.handlers.get(channel)?.(message); + const handlerFn = (msg: PubSub.Command | PubSub.WorkerResponse) => { + const eventName = 'command' in msg ? msg.command : msg.response; + this.eventService.emit(eventName, msg.payload); + }; + + const debouncedHandlerFn = debounce(handlerFn, 300); + + this.client.on('message', (_channel: PubSub.Channel, str) => { + const msg = this.parseMessage(str); + if (!msg) return; + if (msg.debounce) debouncedHandlerFn(msg); + else handlerFn(msg); }); } @@ -60,49 +68,31 @@ export class Subscriber { }); } - /** Set the message handler function for a channel. */ - setMessageHandler(channel: PubSub.Channel, handlerFn: PubSub.HandlerFn) { - this.handlers.set(channel, handlerFn); - } - - // #endregion - // #region Commands - setCommandMessageHandler() { - const handlerFn = (msg: PubSub.Command) => this.eventService.emit(msg.command, msg.payload); - const debouncedHandlerFn = debounce(handlerFn, 300); - - this.setMessageHandler('n8n.commands', (str: string) => { - const msg = this.parseCommandMessage(str); - if (!msg) return; - if (msg.debounce) debouncedHandlerFn(msg); - else handlerFn(msg); + private parseMessage(str: string) { + const msg = jsonParse(str, { + fallbackValue: null, }); - } - - private parseCommandMessage(str: string) { - const msg = jsonParse(str, { fallbackValue: null }); if (!msg) { - this.logger.debug('Received invalid string via command channel', { message: str }); + this.logger.debug('Received invalid string via pubsub channel', { message: str }); return null; } - this.logger.debug('Received message via command channel', msg); - const queueModeId = config.getEnv('redis.queueModeId'); if ( + 'command' in msg && !msg.selfSend && (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) ) { - this.logger.debug('Disregarding message - not for this instance', msg); - return null; } + this.logger.debug('Received message via pubsub channel', msg); + return msg; } diff --git a/packages/cli/src/scaling/worker-status.ts b/packages/cli/src/scaling/worker-status.service.ts similarity index 85% rename from packages/cli/src/scaling/worker-status.ts rename to packages/cli/src/scaling/worker-status.service.ts index cddccc7e1f..725cbb0ca7 100644 --- a/packages/cli/src/scaling/worker-status.ts +++ b/packages/cli/src/scaling/worker-status.service.ts @@ -1,3 +1,4 @@ +import type { WorkerStatus } from '@n8n/api-types'; import os from 'node:os'; import { Service } from 'typedi'; @@ -7,12 +8,12 @@ import { N8N_VERSION } from '@/constants'; import { JobProcessor } from './job-processor'; @Service() -export class WorkerStatus { +export class WorkerStatusService { constructor(private readonly jobProcessor: JobProcessor) {} - generateStatus() { + generateStatus(): WorkerStatus { return { - workerId: config.getEnv('redis.queueModeId'), + senderId: config.getEnv('redis.queueModeId'), runningJobsSummary: this.jobProcessor.getRunningJobsSummary(), freeMem: os.freemem(), totalMem: os.totalmem(), diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index 1fab4705bb..6c66573047 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -1,4 +1,3 @@ -import type { WorkerStatus } from '@n8n/api-types'; import type Redis from 'ioredis'; import { mock } from 'jest-mock-extended'; import { InstanceSettings } from 'n8n-core'; @@ -7,20 +6,12 @@ import Container from 'typedi'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; import config from '@/config'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { Push } from '@/push'; -import type { PubSub } from '@/scaling/pubsub/pubsub.types'; -import * as helpers from '@/services/orchestration/helpers'; -import { handleCommandMessageMain } from '@/services/orchestration/main/handle-command-message-main'; -import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handle-worker-response-message-main'; -import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { OrchestrationService } from '@/services/orchestration.service'; import { RedisClientService } from '@/services/redis-client.service'; import { mockInstance } from '@test/mocking'; -import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types'; - config.set('executions.mode', 'queue'); config.set('generic.instanceType', 'main'); @@ -30,21 +21,13 @@ const mockRedisClient = mock(); redisClientService.createClient.mockReturnValue(mockRedisClient); const os = Container.get(OrchestrationService); -const handler = Container.get(OrchestrationHandlerMainService); mockInstance(ActiveWorkflowManager); let queueModeId: string; -const workerStatusResponse: PubSub.WorkerResponse = { - workerId: 'test', - command: 'get-worker-status', - payload: mock(), -}; - describe('Orchestration Service', () => { mockInstance(Push); mockInstance(ExternalSecretsManager); - const eventBus = mockInstance(MessageEventBus); beforeAll(async () => { queueModeId = config.get('redis.queueModeId'); @@ -63,73 +46,11 @@ describe('Orchestration Service', () => { test('should initialize', async () => { await os.init(); - await handler.init(); // @ts-expect-error Private field expect(os.publisher).toBeDefined(); - // @ts-expect-error Private field - expect(handler.subscriber).toBeDefined(); expect(queueModeId).toBeDefined(); }); - test('should handle worker responses', async () => { - const response = await handleWorkerResponseMessageMain( - JSON.stringify(workerStatusResponse), - mock(), - ); - expect(response?.command).toEqual('get-worker-status'); - }); - - test('should handle command messages from others', async () => { - const responseFalseId = await handleCommandMessageMain( - JSON.stringify({ - senderId: 'test', - command: 'reload-license', - }), - ); - expect(responseFalseId).toBeDefined(); - expect(responseFalseId!.command).toEqual('reload-license'); - expect(responseFalseId!.senderId).toEqual('test'); - }); - - test('should reject command messages from itself', async () => { - const response = await handleCommandMessageMain( - JSON.stringify({ ...workerStatusResponse, senderId: queueModeId }), - ); - expect(response).toBeDefined(); - expect(response!.command).toEqual('get-worker-status'); - expect(response!.senderId).toEqual(queueModeId); - expect(eventBus.restart).not.toHaveBeenCalled(); - }); - - test('should send command messages', async () => { - // @ts-expect-error Private field - jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {}); - await os.getWorkerStatus(); - // @ts-expect-error Private field - expect(os.publisher.publishCommand).toHaveBeenCalled(); - // @ts-expect-error Private field - jest.spyOn(os.publisher, 'publishCommand').mockRestore(); - }); - - test('should prevent receiving commands too often', async () => { - jest.spyOn(helpers, 'debounceMessageReceiver'); - const res1 = await handleCommandMessageMain( - JSON.stringify({ - senderId: 'test', - command: 'reload-external-secrets-providers', - }), - ); - const res2 = await handleCommandMessageMain( - JSON.stringify({ - senderId: 'test', - command: 'reload-external-secrets-providers', - }), - ); - expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2); - expect(res1!.payload).toBeUndefined(); - expect(res2!.payload).toEqual({ result: 'debounced' }); - }); - describe('shouldAddWebhooks', () => { test('should return true for init', () => { // We want to ensure that webhooks are populated on init diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts deleted file mode 100644 index f36bb4adf9..0000000000 --- a/packages/cli/src/services/orchestration/helpers.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { jsonParse } from 'n8n-workflow'; -import os from 'node:os'; -import { Container } from 'typedi'; - -import { Logger } from '@/logging/logger.service'; -import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; -import type { PubSub } from '@/scaling/pubsub/pubsub.types'; - -export interface RedisServiceCommandLastReceived { - [date: string]: Date; -} - -export function messageToRedisServiceCommandObject(messageString: string) { - if (!messageString) return; - let message: PubSub.Command; - try { - message = jsonParse(messageString); - } catch { - Container.get(Logger).debug( - `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, - ); - return; - } - return message; -} - -const lastReceived: RedisServiceCommandLastReceived = {}; - -export function debounceMessageReceiver(message: PubSub.Command, timeout: number = 100) { - const now = new Date(); - const lastReceivedDate = lastReceived[message.command]; - if (lastReceivedDate && now.getTime() - lastReceivedDate.getTime() < timeout) { - return false; - } - lastReceived[message.command] = now; - return true; -} - -export function getOsCpuString(): string { - const cpus = os.cpus(); - if (cpus.length === 0) return 'no CPU info'; - return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`; -} diff --git a/packages/cli/src/services/orchestration/main/handle-command-message-main.ts b/packages/cli/src/services/orchestration/main/handle-command-message-main.ts deleted file mode 100644 index 7af2fa6f56..0000000000 --- a/packages/cli/src/services/orchestration/main/handle-command-message-main.ts +++ /dev/null @@ -1,234 +0,0 @@ -import { InstanceSettings } from 'n8n-core'; -import { Container } from 'typedi'; - -import { ActiveWorkflowManager } from '@/active-workflow-manager'; -import config from '@/config'; -import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; -import { License } from '@/license'; -import { Logger } from '@/logging/logger.service'; -import { Push } from '@/push'; -import { CommunityPackagesService } from '@/services/community-packages.service'; -import { OrchestrationService } from '@/services/orchestration.service'; -import { TestWebhooks } from '@/webhooks/test-webhooks'; - -import { debounceMessageReceiver, messageToRedisServiceCommandObject } from '../helpers'; - -// eslint-disable-next-line complexity -export async function handleCommandMessageMain(messageString: string) { - const queueModeId = config.getEnv('redis.queueModeId'); - const isMainInstance = Container.get(InstanceSettings).instanceType === 'main'; - const message = messageToRedisServiceCommandObject(messageString); - const logger = Container.get(Logger); - - if (message) { - logger.debug( - `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, - ); - - if ( - !message.selfSend && - (message.senderId === queueModeId || - (message.targets && !message.targets.includes(queueModeId))) - ) { - logger.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - - const push = Container.get(Push); - - switch (message.command) { - case 'reload-license': - if (!debounceMessageReceiver(message, 500)) { - return { ...message, payload: { result: 'debounced' } }; - } - - if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { - return message; // this main is the sender, so disregard - } - await Container.get(License).reload(); - break; - case 'restart-event-bus': - if (!debounceMessageReceiver(message, 200)) { - return { ...message, payload: { result: 'debounced' } }; - } - await Container.get(MessageEventBus).restart(); - case 'reload-external-secrets-providers': - if (!debounceMessageReceiver(message, 200)) { - return { ...message, payload: { result: 'debounced' } }; - } - await Container.get(ExternalSecretsManager).reloadAllProviders(); - break; - case 'community-package-install': - case 'community-package-update': - case 'community-package-uninstall': - if (!debounceMessageReceiver(message, 200)) { - return message; - } - const { packageName } = message.payload; - const communityPackagesService = Container.get(CommunityPackagesService); - if (message.command === 'community-package-uninstall') { - await communityPackagesService.removeNpmPackage(packageName); - } else { - await communityPackagesService.installOrUpdateNpmPackage( - packageName, - message.payload.packageVersion, - ); - } - break; - - case 'add-webhooks-triggers-and-pollers': { - if (!debounceMessageReceiver(message, 100)) { - return { ...message, payload: { result: 'debounced' } }; - } - - const orchestrationService = Container.get(OrchestrationService); - - if (orchestrationService.isFollower) break; - - if (typeof message.payload?.workflowId !== 'string') break; - - const { workflowId } = message.payload; - - try { - await Container.get(ActiveWorkflowManager).add(workflowId, 'activate', undefined, { - shouldPublish: false, // prevent leader re-publishing message - }); - - push.broadcast('workflowActivated', { workflowId }); - - // instruct followers to show activation in UI - await orchestrationService.publish('display-workflow-activation', { workflowId }); - } catch (error) { - if (error instanceof Error) { - await Container.get(WorkflowRepository).update(workflowId, { active: false }); - - Container.get(Push).broadcast('workflowFailedToActivate', { - workflowId, - errorMessage: error.message, - }); - - await Container.get(OrchestrationService).publish('display-workflow-activation-error', { - workflowId, - errorMessage: error.message, - }); - } - } - - break; - } - - case 'remove-triggers-and-pollers': { - if (!debounceMessageReceiver(message, 100)) { - return { ...message, payload: { result: 'debounced' } }; - } - - const orchestrationService = Container.get(OrchestrationService); - - if (orchestrationService.isFollower) break; - - if (typeof message.payload?.workflowId !== 'string') break; - - const { workflowId } = message.payload; - - const activeWorkflowManager = Container.get(ActiveWorkflowManager); - - await activeWorkflowManager.removeActivationError(workflowId); - await activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId); - - push.broadcast('workflowDeactivated', { workflowId }); - - // instruct followers to show workflow deactivation in UI - await orchestrationService.publish('display-workflow-deactivation', { workflowId }); - - break; - } - - case 'display-workflow-activation': { - if (!debounceMessageReceiver(message, 100)) { - return { ...message, payload: { result: 'debounced' } }; - } - - const { workflowId } = message.payload ?? {}; - - if (typeof workflowId !== 'string') break; - - push.broadcast('workflowActivated', { workflowId }); - - break; - } - - case 'display-workflow-deactivation': { - if (!debounceMessageReceiver(message, 100)) { - return { ...message, payload: { result: 'debounced' } }; - } - - const { workflowId } = message.payload ?? {}; - - if (typeof workflowId !== 'string') break; - - push.broadcast('workflowDeactivated', { workflowId }); - - break; - } - - case 'display-workflow-activation-error': { - if (!debounceMessageReceiver(message, 100)) { - return { ...message, payload: { result: 'debounced' } }; - } - - const { workflowId, errorMessage } = message.payload ?? {}; - - if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break; - - Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); - - break; - } - - case 'relay-execution-lifecycle-event': { - /** - * Do not debounce this - all events share the same message name. - */ - - const { type, args, pushRef } = message.payload; - - if (!push.getBackend().hasPushRef(pushRef)) break; - - push.send(type, args, pushRef); - - break; - } - - case 'clear-test-webhooks': { - if (!debounceMessageReceiver(message, 100)) { - // @ts-expect-error Legacy typing - message.payload = { result: 'debounced' }; - return message; - } - - const { webhookKey, workflowEntity, pushRef } = message.payload; - - if (!push.getBackend().hasPushRef(pushRef)) break; - - const testWebhooks = Container.get(TestWebhooks); - - testWebhooks.clearTimeout(webhookKey); - - const workflow = testWebhooks.toWorkflow(workflowEntity); - - await testWebhooks.deactivateWebhooks(workflow); - - break; - } - - default: - break; - } - return message; - } - return; -} diff --git a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts deleted file mode 100644 index 1a99382c19..0000000000 --- a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { jsonParse } from 'n8n-workflow'; -import Container from 'typedi'; - -import { Logger } from '@/logging/logger.service'; -import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; -import type { PubSub } from '@/scaling/pubsub/pubsub.types'; -import { assertNever } from '@/utils'; - -import type { MainResponseReceivedHandlerOptions } from './types'; -import { Push } from '../../../push'; - -export async function handleWorkerResponseMessageMain( - messageString: string, - options: MainResponseReceivedHandlerOptions, -) { - const workerResponse = jsonParse(messageString, { - fallbackValue: null, - }); - - if (!workerResponse) { - Container.get(Logger).debug( - `Received invalid message via channel ${WORKER_RESPONSE_PUBSUB_CHANNEL}: "${messageString}"`, - ); - return; - } - - if (workerResponse.targets && !workerResponse.targets.includes(options.queueModeId)) return; - - switch (workerResponse.command) { - case 'get-worker-status': - Container.get(Push).broadcast('sendWorkerStatusMessage', { - workerId: workerResponse.workerId, - status: workerResponse.payload, - }); - break; - default: - assertNever(workerResponse.command); - } - - return workerResponse; -} diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts deleted file mode 100644 index 7f4effdd4a..0000000000 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Service } from 'typedi'; - -import { Subscriber } from '@/scaling/pubsub/subscriber.service'; - -import { handleCommandMessageMain } from './handle-command-message-main'; -import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main'; -import type { MainResponseReceivedHandlerOptions } from './types'; -import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; - -@Service() -export class OrchestrationHandlerMainService extends OrchestrationHandlerService { - constructor(private readonly subscriber: Subscriber) { - super(); - } - - async initSubscriber(options: MainResponseReceivedHandlerOptions) { - await this.subscriber.subscribe('n8n.commands'); - await this.subscriber.subscribe('n8n.worker-response'); - - this.subscriber.setMessageHandler('n8n.worker-response', async (message: string) => { - await handleWorkerResponseMessageMain(message, options); - }); - - this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageMain); - } -} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 585d64cfb4..2326ed595a 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -10,10 +10,12 @@ import { ExternalHooks } from '@/external-hooks'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; +import { Push } from '@/push'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; +import { Telemetry } from '@/telemetry'; import { setupTestCommand } from '@test-integration/utils/test-command'; import { mockInstance } from '../../shared/mocking'; @@ -31,6 +33,8 @@ const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); mockInstance(Publisher); mockInstance(Subscriber); +mockInstance(Telemetry); +mockInstance(Push); const command = setupTestCommand(Worker); diff --git a/packages/editor-ui/src/components/Workers/WorkerCard.ee.vue b/packages/editor-ui/src/components/Workers/WorkerCard.ee.vue index 373ba8c21c..c8c7c222c9 100644 --- a/packages/editor-ui/src/components/Workers/WorkerCard.ee.vue +++ b/packages/editor-ui/src/components/Workers/WorkerCard.ee.vue @@ -63,7 +63,7 @@ onBeforeUnmount(() => { :class="stale ? [$style.cardHeading, $style.stale] : [$style.cardHeading]" data-test-id="worker-card-name" > - Name: {{ worker.workerId }} ({{ worker.hostname }})
+ Name: {{ worker.senderId }} ({{ worker.hostname }})
Average Load: {{ averageWorkerLoadFromLoadsAsString(worker.loadAvg ?? [0]) }} | Free Memory: {{ memAsGb(worker.freeMem).toFixed(2) }}GB / {{ memAsGb(worker.totalMem).toFixed(2) }}GB {{ stale ? ' (stale)' : '' }} @@ -78,7 +78,7 @@ onBeforeUnmount(() => { > - +