refactor(core): Simplify subscriber handler setters (#10896)

This commit is contained in:
Iván Ovejero 2024-09-23 14:14:00 +02:00 committed by GitHub
parent 23c09eae42
commit 5c171c4bf0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 170 additions and 167 deletions

View file

@ -9,6 +9,7 @@ import { Subscriber } from '../pubsub/subscriber.service';
describe('Subscriber', () => {
beforeEach(() => {
config.set('executions.mode', 'queue');
jest.restoreAllMocks();
});
const client = mock<SingleNodeClient>();
@ -47,14 +48,16 @@ describe('Subscriber', () => {
});
});
describe('setHandler', () => {
it('should set handler function', () => {
describe('setMessageHandler', () => {
it('should set message handler function for channel', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const channel = 'n8n.commands';
const handlerFn = jest.fn();
subscriber.addMessageHandler(handlerFn);
subscriber.setMessageHandler(channel, handlerFn);
expect(client.on).toHaveBeenCalledWith('message', handlerFn);
// @ts-expect-error Private field
expect(subscriber.handlers).toEqual(new Map([[channel, handlerFn]]));
});
});
});

View file

@ -2,6 +2,8 @@ export const QUEUE_NAME = 'jobs';
export const JOB_TYPE_NAME = 'job';
/** Pubsub channel for commands sent by a main process to workers or to other main processes. */
export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands';
/** Pubsub channel for messages sent by workers in response to commands from main processes. */
export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response';

View file

@ -4,15 +4,11 @@ import type { IWorkflowDb } from '@/interfaces';
import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants';
/**
* Pubsub channel used by scaling mode:
*
* - `n8n.commands` for messages sent by a main process to command workers or other main processes
* - `n8n.worker-response` for messages sent by workers in response to commands from main processes
*/
export type ScalingPubSubChannel =
| typeof COMMAND_PUBSUB_CHANNEL
| typeof WORKER_RESPONSE_PUBSUB_CHANNEL;
/** Pubsub channel used by scaling mode. */
export type PubSubChannel = typeof COMMAND_PUBSUB_CHANNEL | typeof WORKER_RESPONSE_PUBSUB_CHANNEL;
/** Handler function for every message received via a `PubSubChannel`. */
export type PubSubHandlerFn = (msg: string) => void;
export type PubSubMessageMap = {
// #region Lifecycle

View file

@ -5,7 +5,7 @@ import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis-client.service';
import type { ScalingPubSubChannel } from './pubsub.types';
import type { PubSubHandlerFn, PubSubChannel } from './pubsub.types';
/**
* Responsible for subscribing to the pubsub channels used by scaling mode.
@ -14,6 +14,8 @@ import type { ScalingPubSubChannel } from './pubsub.types';
export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient;
private readonly handlers: Map<PubSubChannel, PubSubHandlerFn> = new Map();
// #region Lifecycle
constructor(
@ -26,6 +28,10 @@ export class Subscriber {
this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });
this.client.on('error', (error) => this.logger.error(error.message));
this.client.on('message', (channel: PubSubChannel, message) => {
this.handlers.get(channel)?.(message);
});
}
getClient() {
@ -41,7 +47,7 @@ export class Subscriber {
// #region Subscribing
async subscribe(channel: ScalingPubSubChannel) {
async subscribe(channel: PubSubChannel) {
await this.client.subscribe(channel, (error) => {
if (error) {
this.logger.error('Failed to subscribe to channel', { channel, cause: error });
@ -52,8 +58,9 @@ export class Subscriber {
});
}
addMessageHandler(handlerFn: (channel: string, msg: string) => void) {
this.client.on('message', handlerFn);
/** Set the message handler function for a channel. */
setMessageHandler(channel: PubSubChannel, handlerFn: PubSubHandlerFn) {
this.handlers.set(channel, handlerFn);
}
// #endregion

View file

@ -1,6 +1,5 @@
import { Service } from 'typedi';
import { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageMain } from './handle-command-message-main';
@ -18,12 +17,10 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService
await this.subscriber.subscribe('n8n.commands');
await this.subscriber.subscribe('n8n.worker-response');
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_PUBSUB_CHANNEL) {
await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_PUBSUB_CHANNEL) {
await handleCommandMessageMain(messageString);
}
this.subscriber.setMessageHandler('n8n.worker-response', async (message: string) => {
await handleWorkerResponseMessageMain(message, options);
});
this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageMain);
}
}

View file

@ -1,6 +1,5 @@
import { Service } from 'typedi';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageWebhook } from './handle-command-message-webhook';
@ -15,10 +14,6 @@ export class OrchestrationHandlerWebhookService extends OrchestrationHandlerServ
async initSubscriber() {
await this.subscriber.subscribe('n8n.commands');
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === COMMAND_PUBSUB_CHANNEL) {
await handleCommandMessageWebhook(messageString);
}
});
this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageWebhook);
}
}

View file

@ -14,142 +14,142 @@ import { CommunityPackagesService } from '@/services/community-packages.service'
import type { WorkerCommandReceivedHandlerOptions } from './types';
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
// eslint-disable-next-line complexity
return async (channel: string, messageString: string) => {
if (channel === COMMAND_PUBSUB_CHANNEL) {
if (!messageString) return;
const logger = Container.get(Logger);
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
logger.debug(
`Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
logger.debug(
`RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`,
);
if (message.targets && !message.targets.includes(options.queueModeId)) {
return; // early return if the message is not for this worker
}
switch (message.command) {
case 'getStatus':
if (!debounceMessageReceiver(message, 500)) return;
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'getStatus',
payload: {
workerId: options.queueModeId,
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) =>
(interfaces ?? [])?.map((net) => ({
family: net.family,
address: net.address,
internal: net.internal,
})),
),
version: N8N_VERSION,
},
});
break;
case 'getId':
if (!debounceMessageReceiver(message, 500)) return;
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'getId',
});
break;
case 'restartEventBus':
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(MessageEventBus).restart();
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
result: 'success',
},
});
} catch (error) {
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
result: 'error',
error: (error as Error).message,
},
});
}
break;
case 'reloadExternalSecretsProviders':
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'success',
},
});
} catch (error) {
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'error',
error: (error as Error).message,
},
});
}
break;
case 'community-package-install':
case 'community-package-update':
case 'community-package-uninstall':
if (!debounceMessageReceiver(message, 500)) return;
const { packageName, packageVersion } = message.payload;
const communityPackagesService = Container.get(CommunityPackagesService);
if (message.command === 'community-package-uninstall') {
await communityPackagesService.removeNpmPackage(packageName);
} else {
await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion);
}
break;
case 'reloadLicense':
if (!debounceMessageReceiver(message, 500)) return;
await Container.get(License).reload();
break;
case 'stopWorker':
if (!debounceMessageReceiver(message, 500)) return;
// TODO: implement proper shutdown
// await this.stopProcess();
break;
default:
if (
message.command === 'relay-execution-lifecycle-event' ||
message.command === 'clear-test-webhooks'
) {
break; // meant only for main
}
// eslint-disable-next-line complexity
export async function getWorkerCommandReceivedHandler(
messageString: string,
options: WorkerCommandReceivedHandlerOptions,
) {
if (!messageString) return;
logger.debug(
`Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`,
);
break;
}
}
const logger = Container.get(Logger);
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
logger.debug(
`Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
logger.debug(
`RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`,
);
if (message.targets && !message.targets.includes(options.queueModeId)) {
return; // early return if the message is not for this worker
}
};
switch (message.command) {
case 'getStatus':
if (!debounceMessageReceiver(message, 500)) return;
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'getStatus',
payload: {
workerId: options.queueModeId,
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) =>
(interfaces ?? [])?.map((net) => ({
family: net.family,
address: net.address,
internal: net.internal,
})),
),
version: N8N_VERSION,
},
});
break;
case 'getId':
if (!debounceMessageReceiver(message, 500)) return;
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'getId',
});
break;
case 'restartEventBus':
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(MessageEventBus).restart();
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
result: 'success',
},
});
} catch (error) {
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
result: 'error',
error: (error as Error).message,
},
});
}
break;
case 'reloadExternalSecretsProviders':
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'success',
},
});
} catch (error) {
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'error',
error: (error as Error).message,
},
});
}
break;
case 'community-package-install':
case 'community-package-update':
case 'community-package-uninstall':
if (!debounceMessageReceiver(message, 500)) return;
const { packageName, packageVersion } = message.payload;
const communityPackagesService = Container.get(CommunityPackagesService);
if (message.command === 'community-package-uninstall') {
await communityPackagesService.removeNpmPackage(packageName);
} else {
await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion);
}
break;
case 'reloadLicense':
if (!debounceMessageReceiver(message, 500)) return;
await Container.get(License).reload();
break;
case 'stopWorker':
if (!debounceMessageReceiver(message, 500)) return;
// TODO: implement proper shutdown
// await this.stopProcess();
break;
default:
if (
message.command === 'relay-execution-lifecycle-event' ||
message.command === 'clear-test-webhooks'
) {
break; // meant only for main
}
logger.debug(
`Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`,
);
break;
}
}
}

View file

@ -14,6 +14,9 @@ export class OrchestrationHandlerWorkerService extends OrchestrationHandlerServi
async initSubscriber(options: WorkerCommandReceivedHandlerOptions) {
await this.subscriber.subscribe('n8n.commands');
this.subscriber.addMessageHandler(getWorkerCommandReceivedHandler(options));
this.subscriber.setMessageHandler('n8n.commands', async (message: string) => {
await getWorkerCommandReceivedHandler(message, options);
});
}
}