refactor(core): Remove dead pubsub code (#11180)

This commit is contained in:
Iván Ovejero 2024-10-09 12:56:06 +02:00 committed by GitHub
parent d69842c87d
commit 518e320404
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 13 additions and 92 deletions

View file

@ -28,11 +28,4 @@ export class OrchestrationController {
if (!this.licenseService.isWorkerViewLicensed()) return; if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerStatus(); return await this.orchestrationService.getWorkerStatus();
} }
@GlobalScope('orchestration:list')
@Post('/worker/ids')
async getWorkerIdsAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerIds();
}
} }

View file

@ -80,25 +80,5 @@ export type PubSubCommandMap = {
}; };
export type PubSubWorkerResponseMap = { export type PubSubWorkerResponseMap = {
// #region Lifecycle
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus; 'get-worker-status': WorkerStatus;
// #endregion
}; };

View file

@ -61,7 +61,7 @@ describe('Publisher', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService); const publisher = new Publisher(mock(), redisClientService);
const msg = mock<PubSub.WorkerResponse>({ const msg = mock<PubSub.WorkerResponse>({
command: 'reload-external-secrets-providers', command: 'get-worker-status',
}); });
await publisher.publishWorkerResponse(msg); await publisher.publishWorkerResponse(msg);

View file

@ -195,7 +195,6 @@ describe('PubSubHandler', () => {
'community-package-update': expect.any(Function), 'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function), 'community-package-uninstall': expect.any(Function),
'get-worker-status': expect.any(Function), 'get-worker-status': expect.any(Function),
'get-worker-id': expect.any(Function),
}); });
}); });
@ -266,25 +265,5 @@ describe('PubSubHandler', () => {
expect(workerStatus.generateStatus).toHaveBeenCalled(); expect(workerStatus.generateStatus).toHaveBeenCalled();
}); });
it('should get worker ID on `get-worker-id` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('get-worker-id');
expect(publisher.publishWorkerResponse).toHaveBeenCalledWith({
workerId: expect.any(String),
command: 'get-worker-id',
});
});
}); });
}); });

View file

@ -43,11 +43,6 @@ export class PubSubHandler {
command: 'get-worker-status', command: 'get-worker-status',
payload: this.workerStatus.generateStatus(), payload: this.workerStatus.generateStatus(),
}), }),
'get-worker-id': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-id',
}),
}); });
break; break;
case 'main': case 'main':

View file

@ -86,18 +86,6 @@ export namespace PubSub {
_ToWorkerResponse<WorkerResponseKey> _ToWorkerResponse<WorkerResponseKey>
>; >;
namespace WorkerResponses {
export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>;
export type ReloadExternalSecretsProviders =
ToWorkerResponse<'reload-external-secrets-providers'>;
export type GetWorkerId = ToWorkerResponse<'get-worker-id'>;
export type GetWorkerStatus = ToWorkerResponse<'get-worker-status'>;
}
/** Response sent via the `n8n.worker-response` pubsub channel. */ /** Response sent via the `n8n.worker-response` pubsub channel. */
export type WorkerResponse = export type WorkerResponse = ToWorkerResponse<'get-worker-status'>;
| WorkerResponses.RestartEventBus
| WorkerResponses.ReloadExternalSecretsProviders
| WorkerResponses.GetWorkerId
| WorkerResponses.GetWorkerStatus;
} }

View file

@ -1,3 +1,4 @@
import type { WorkerStatus } from '@n8n/api-types';
import type Redis from 'ioredis'; import type Redis from 'ioredis';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core'; import { InstanceSettings } from 'n8n-core';
@ -34,12 +35,10 @@ mockInstance(ActiveWorkflowManager);
let queueModeId: string; let queueModeId: string;
const workerRestartEventBusResponse: PubSub.WorkerResponse = { const workerStatusResponse: PubSub.WorkerResponse = {
workerId: 'test', workerId: 'test',
command: 'restart-event-bus', command: 'get-worker-status',
payload: { payload: mock<WorkerStatus>(),
result: 'success',
},
}; };
describe('Orchestration Service', () => { describe('Orchestration Service', () => {
@ -74,10 +73,10 @@ describe('Orchestration Service', () => {
test('should handle worker responses', async () => { test('should handle worker responses', async () => {
const response = await handleWorkerResponseMessageMain( const response = await handleWorkerResponseMessageMain(
JSON.stringify(workerRestartEventBusResponse), JSON.stringify(workerStatusResponse),
mock<MainResponseReceivedHandlerOptions>(), mock<MainResponseReceivedHandlerOptions>(),
); );
expect(response?.command).toEqual('restart-event-bus'); expect(response?.command).toEqual('get-worker-status');
}); });
test('should handle command messages from others', async () => { test('should handle command messages from others', async () => {
@ -94,10 +93,10 @@ describe('Orchestration Service', () => {
test('should reject command messages from itself', async () => { test('should reject command messages from itself', async () => {
const response = await handleCommandMessageMain( const response = await handleCommandMessageMain(
JSON.stringify({ ...workerRestartEventBusResponse, senderId: queueModeId }), JSON.stringify({ ...workerStatusResponse, senderId: queueModeId }),
); );
expect(response).toBeDefined(); expect(response).toBeDefined();
expect(response!.command).toEqual('restart-event-bus'); expect(response!.command).toEqual('get-worker-status');
expect(response!.senderId).toEqual(queueModeId); expect(response!.senderId).toEqual(queueModeId);
expect(eventBus.restart).not.toHaveBeenCalled(); expect(eventBus.restart).not.toHaveBeenCalled();
}); });
@ -105,7 +104,7 @@ describe('Orchestration Service', () => {
test('should send command messages', async () => { test('should send command messages', async () => {
// @ts-expect-error Private field // @ts-expect-error Private field
jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {}); jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {});
await os.getWorkerIds(); await os.getWorkerStatus();
// @ts-expect-error Private field // @ts-expect-error Private field
expect(os.publisher.publishCommand).toHaveBeenCalled(); expect(os.publisher.publishCommand).toHaveBeenCalled();
// @ts-expect-error Private field // @ts-expect-error Private field

View file

@ -128,16 +128,6 @@ export class OrchestrationService {
}); });
} }
async getWorkerIds() {
if (!this.sanityCheck()) return;
const command = 'get-worker-id';
this.logger.debug(`Sending "${command}" to command channel`);
await this.publisher.publishCommand({ command });
}
// ---------------------------------- // ----------------------------------
// activations // activations
// ---------------------------------- // ----------------------------------

View file

@ -4,6 +4,7 @@ import Container from 'typedi';
import { Logger } from '@/logging/logger.service'; import { Logger } from '@/logging/logger.service';
import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import type { PubSub } from '@/scaling/pubsub/pubsub.types'; import type { PubSub } from '@/scaling/pubsub/pubsub.types';
import { assertNever } from '@/utils';
import type { MainResponseReceivedHandlerOptions } from './types'; import type { MainResponseReceivedHandlerOptions } from './types';
import { Push } from '../../../push'; import { Push } from '../../../push';
@ -32,12 +33,8 @@ export async function handleWorkerResponseMessageMain(
status: workerResponse.payload, status: workerResponse.payload,
}); });
break; break;
case 'get-worker-id':
break;
default: default:
Container.get(Logger).debug( assertNever(workerResponse.command);
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
} }
return workerResponse; return workerResponse;