From 518e32040414436f75370f9f63b76b4e429735ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 9 Oct 2024 12:56:06 +0200 Subject: [PATCH] refactor(core): Remove dead pubsub code (#11180) --- .../controllers/orchestration.controller.ts | 7 ------- .../cli/src/events/maps/pub-sub.event-map.ts | 20 ------------------ .../__tests__/publisher.service.test.ts | 2 +- .../scaling/__tests__/pubsub-handler.test.ts | 21 ------------------- .../cli/src/scaling/pubsub/pubsub-handler.ts | 5 ----- .../cli/src/scaling/pubsub/pubsub.types.ts | 14 +------------ .../__tests__/orchestration.service.test.ts | 19 ++++++++--------- .../cli/src/services/orchestration.service.ts | 10 --------- .../handle-worker-response-message-main.ts | 7 ++----- 9 files changed, 13 insertions(+), 92 deletions(-) diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index a5235d1169..db1d690a3e 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -28,11 +28,4 @@ export class OrchestrationController { if (!this.licenseService.isWorkerViewLicensed()) return; return await this.orchestrationService.getWorkerStatus(); } - - @GlobalScope('orchestration:list') - @Post('/worker/ids') - async getWorkerIdsAll() { - if (!this.licenseService.isWorkerViewLicensed()) return; - return await this.orchestrationService.getWorkerIds(); - } } diff --git a/packages/cli/src/events/maps/pub-sub.event-map.ts b/packages/cli/src/events/maps/pub-sub.event-map.ts index 9237e79d13..a1134470bb 100644 --- a/packages/cli/src/events/maps/pub-sub.event-map.ts +++ b/packages/cli/src/events/maps/pub-sub.event-map.ts @@ -80,25 +80,5 @@ export type PubSubCommandMap = { }; export type PubSubWorkerResponseMap = { - // #region Lifecycle - - 'restart-event-bus': { - result: 'success' | 'error'; - error?: string; - }; - - 'reload-external-secrets-providers': { - result: 'success' | 'error'; - error?: string; - }; - - // #endregion - - // #region Worker view - - 'get-worker-id': never; - 'get-worker-status': WorkerStatus; - - // #endregion }; diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index 439af01ef9..3386a5c648 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -61,7 +61,7 @@ describe('Publisher', () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { const publisher = new Publisher(mock(), redisClientService); const msg = mock({ - command: 'reload-external-secrets-providers', + command: 'get-worker-status', }); await publisher.publishWorkerResponse(msg); diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index 0cf8d5ef48..fdb7c8dd23 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -195,7 +195,6 @@ describe('PubSubHandler', () => { 'community-package-update': expect.any(Function), 'community-package-uninstall': expect.any(Function), 'get-worker-status': expect.any(Function), - 'get-worker-id': expect.any(Function), }); }); @@ -266,25 +265,5 @@ describe('PubSubHandler', () => { expect(workerStatus.generateStatus).toHaveBeenCalled(); }); - - it('should get worker ID on `get-worker-id` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatus, - ).init(); - - eventService.emit('get-worker-id'); - - expect(publisher.publishWorkerResponse).toHaveBeenCalledWith({ - workerId: expect.any(String), - command: 'get-worker-id', - }); - }); }); }); diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 267cae6977..215549e1b9 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -43,11 +43,6 @@ export class PubSubHandler { command: 'get-worker-status', payload: this.workerStatus.generateStatus(), }), - 'get-worker-id': async () => - await this.publisher.publishWorkerResponse({ - workerId: config.getEnv('redis.queueModeId'), - command: 'get-worker-id', - }), }); break; case 'main': diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index be38cc98f8..a356503348 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -86,18 +86,6 @@ export namespace PubSub { _ToWorkerResponse >; - namespace WorkerResponses { - export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>; - export type ReloadExternalSecretsProviders = - ToWorkerResponse<'reload-external-secrets-providers'>; - export type GetWorkerId = ToWorkerResponse<'get-worker-id'>; - export type GetWorkerStatus = ToWorkerResponse<'get-worker-status'>; - } - /** Response sent via the `n8n.worker-response` pubsub channel. */ - export type WorkerResponse = - | WorkerResponses.RestartEventBus - | WorkerResponses.ReloadExternalSecretsProviders - | WorkerResponses.GetWorkerId - | WorkerResponses.GetWorkerStatus; + export type WorkerResponse = ToWorkerResponse<'get-worker-status'>; } diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index 7bbd797000..1fab4705bb 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -1,3 +1,4 @@ +import type { WorkerStatus } from '@n8n/api-types'; import type Redis from 'ioredis'; import { mock } from 'jest-mock-extended'; import { InstanceSettings } from 'n8n-core'; @@ -34,12 +35,10 @@ mockInstance(ActiveWorkflowManager); let queueModeId: string; -const workerRestartEventBusResponse: PubSub.WorkerResponse = { +const workerStatusResponse: PubSub.WorkerResponse = { workerId: 'test', - command: 'restart-event-bus', - payload: { - result: 'success', - }, + command: 'get-worker-status', + payload: mock(), }; describe('Orchestration Service', () => { @@ -74,10 +73,10 @@ describe('Orchestration Service', () => { test('should handle worker responses', async () => { const response = await handleWorkerResponseMessageMain( - JSON.stringify(workerRestartEventBusResponse), + JSON.stringify(workerStatusResponse), mock(), ); - expect(response?.command).toEqual('restart-event-bus'); + expect(response?.command).toEqual('get-worker-status'); }); test('should handle command messages from others', async () => { @@ -94,10 +93,10 @@ describe('Orchestration Service', () => { test('should reject command messages from itself', async () => { const response = await handleCommandMessageMain( - JSON.stringify({ ...workerRestartEventBusResponse, senderId: queueModeId }), + JSON.stringify({ ...workerStatusResponse, senderId: queueModeId }), ); expect(response).toBeDefined(); - expect(response!.command).toEqual('restart-event-bus'); + expect(response!.command).toEqual('get-worker-status'); expect(response!.senderId).toEqual(queueModeId); expect(eventBus.restart).not.toHaveBeenCalled(); }); @@ -105,7 +104,7 @@ describe('Orchestration Service', () => { test('should send command messages', async () => { // @ts-expect-error Private field jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {}); - await os.getWorkerIds(); + await os.getWorkerStatus(); // @ts-expect-error Private field expect(os.publisher.publishCommand).toHaveBeenCalled(); // @ts-expect-error Private field diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 666fe48ac6..b8aba46285 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -128,16 +128,6 @@ export class OrchestrationService { }); } - async getWorkerIds() { - if (!this.sanityCheck()) return; - - const command = 'get-worker-id'; - - this.logger.debug(`Sending "${command}" to command channel`); - - await this.publisher.publishCommand({ command }); - } - // ---------------------------------- // activations // ---------------------------------- diff --git a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts index a3b5912fb4..1a99382c19 100644 --- a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts +++ b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts @@ -4,6 +4,7 @@ import Container from 'typedi'; import { Logger } from '@/logging/logger.service'; import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; import type { PubSub } from '@/scaling/pubsub/pubsub.types'; +import { assertNever } from '@/utils'; import type { MainResponseReceivedHandlerOptions } from './types'; import { Push } from '../../../push'; @@ -32,12 +33,8 @@ export async function handleWorkerResponseMessageMain( status: workerResponse.payload, }); break; - case 'get-worker-id': - break; default: - Container.get(Logger).debug( - `Received worker response ${workerResponse.command} from ${workerResponse.workerId}`, - ); + assertNever(workerResponse.command); } return workerResponse;