refactor(core): Simplify webhook pubsub message handler (#11048)

This commit is contained in:
Iván Ovejero 2024-10-04 09:28:53 +02:00 committed by GitHub
parent 4b0187e7e8
commit 73a0d48073
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 255 additions and 113 deletions

View file

@ -4,7 +4,8 @@ import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { OrchestrationHandlerWebhookService } from '@/services/orchestration/webhook/orchestration.handler.webhook.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
import { WebhookServer } from '@/webhooks/webhook-server';
@ -110,6 +111,11 @@ export class Webhook extends BaseCommand {
async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
await Container.get(OrchestrationHandlerWebhookService).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
Container.get(PubSubHandler).init();
}
}

View file

@ -0,0 +1,142 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import type { License } from '@/license';
import type { CommunityPackagesService } from '@/services/community-packages.service';
import { PubSubHandler } from '../pubsub/pubsub-handler';
describe('PubSubHandler', () => {
const eventService = new EventService();
const license = mock<License>();
const eventbus = mock<MessageEventBus>();
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();
describe('in webhook process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'webhook' });
it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers');
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
expect(setupWebhookHandlersSpy).toHaveBeenCalled();
});
it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
eventService.emit('reload-license');
expect(license.reload).toHaveBeenCalled();
});
it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
eventService.emit('restart-event-bus');
expect(eventbus.restart).toHaveBeenCalled();
});
it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
eventService.emit('reload-external-secrets-providers');
expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});
it('should install community package on `community-package-install` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
eventService.emit('community-package-install', {
packageName: 'test-package',
packageVersion: '1.0.0',
});
expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});
it('should update community package on `community-package-update` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
eventService.emit('community-package-update', {
packageName: 'test-package',
packageVersion: '1.0.0',
});
expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});
it('should uninstall community package on `community-package-uninstall` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
).init();
eventService.emit('community-package-uninstall', {
packageName: 'test-package',
});
expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
});
});

View file

@ -17,14 +17,14 @@ describe('Subscriber', () => {
describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
expect(subscriber.getClient()).toEqual(client);
});
it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
expect(subscriber.getClient()).toBeUndefined();
});
@ -32,7 +32,7 @@ describe('Subscriber', () => {
describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
@ -40,7 +40,7 @@ describe('Subscriber', () => {
describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
await subscriber.subscribe('n8n.commands');
@ -50,7 +50,7 @@ describe('Subscriber', () => {
describe('setMessageHandler', () => {
it('should set message handler function for channel', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const subscriber = new Subscriber(mock(), redisClientService, mock());
const channel = 'n8n.commands';
const handlerFn = jest.fn();

View file

@ -0,0 +1,61 @@
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { CommunityPackagesService } from '@/services/community-packages.service';
/**
* Responsible for handling events emitted from messages received via a pubsub channel.
*/
@Service()
export class PubSubHandler {
constructor(
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
private readonly license: License,
private readonly eventbus: MessageEventBus,
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
) {}
init() {
if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers();
}
private setupHandlers<EventNames extends keyof PubSubEventMap>(
map: {
[EventName in EventNames]?: (event: PubSubEventMap[EventName]) => void | Promise<void>;
},
) {
for (const [eventName, handlerFn] of Object.entries(map) as Array<
[EventNames, (event: PubSubEventMap[EventNames]) => void | Promise<void>]
>) {
this.eventService.on(eventName, async (event) => {
await handlerFn(event);
});
}
}
// #region Webhook process
private setupWebhookHandlers() {
this.setupHandlers({
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
});
}
// #endregion
}

View file

@ -1,7 +1,10 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import debounce from 'lodash/debounce';
import { jsonParse } from 'n8n-workflow';
import { Service } from 'typedi';
import config from '@/config';
import { EventService } from '@/events/event.service';
import { Logger } from '@/logging/logger.service';
import { RedisClientService } from '@/services/redis-client.service';
@ -21,6 +24,7 @@ export class Subscriber {
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly eventService: EventService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
@ -62,4 +66,39 @@ export class Subscriber {
}
// #endregion
// #region Commands
setCommandMessageHandler() {
const handlerFn = debounce((str: string) => {
const msg = this.parseCommandMessage(str);
if (msg) this.eventService.emit(msg.command, msg.payload);
}, 300);
this.setMessageHandler('n8n.commands', handlerFn);
}
private parseCommandMessage(str: string) {
const msg = jsonParse<PubSub.Command | null>(str, { fallbackValue: null });
if (!msg) {
this.logger.debug('Received invalid string via command channel', { message: str });
return null;
}
this.logger.debug('Received message via command channel', msg);
const queueModeId = config.getEnv('redis.queueModeId');
if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) {
this.logger.debug('Disregarding message - not for this instance', msg);
return null;
}
return msg;
}
// #endregion
}

View file

@ -1,87 +0,0 @@
import { InstanceSettings } from 'n8n-core';
import Container from 'typedi';
import { Logger } from 'winston';
import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { CommunityPackagesService } from '@/services/community-packages.service';
import { messageToRedisServiceCommandObject, debounceMessageReceiver } from '../helpers';
export async function handleCommandMessageWebhook(messageString: string) {
const queueModeId = config.getEnv('redis.queueModeId');
const isMainInstance = Container.get(InstanceSettings).instanceType === 'main';
const message = messageToRedisServiceCommandObject(messageString);
const logger = Container.get(Logger);
if (message) {
logger.debug(
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
);
if (
message.senderId === queueModeId ||
(message.targets && !message.targets.includes(queueModeId))
) {
// Skipping command message because it's not for this instance
logger.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'reload-license':
if (!debounceMessageReceiver(message, 500)) {
return { ...message, payload: { result: 'debounced' } };
}
if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
logger.error(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
);
return message;
}
await Container.get(License).reload();
break;
case 'restart-event-bus':
if (!debounceMessageReceiver(message, 200)) {
return { ...message, payload: { result: 'debounced' } };
}
await Container.get(MessageEventBus).restart();
case 'reload-external-secrets-providers':
if (!debounceMessageReceiver(message, 200)) {
return { ...message, payload: { result: 'debounced' } };
}
await Container.get(ExternalSecretsManager).reloadAllProviders();
break;
case 'community-package-install':
case 'community-package-update':
case 'community-package-uninstall':
if (!debounceMessageReceiver(message, 200)) {
return message;
}
const { packageName } = message.payload;
const communityPackagesService = Container.get(CommunityPackagesService);
if (message.command === 'community-package-uninstall') {
await communityPackagesService.removeNpmPackage(packageName);
} else {
await communityPackagesService.installOrUpdateNpmPackage(
packageName,
message.payload.packageVersion,
);
}
break;
default:
break;
}
return message;
}
return;
}

View file

@ -1,19 +0,0 @@
import { Service } from 'typedi';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageWebhook } from './handle-command-message-webhook';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
@Service()
export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService {
constructor(private readonly subscriber: Subscriber) {
super();
}
async initSubscriber() {
await this.subscriber.subscribe('n8n.commands');
this.subscriber.setMessageHandler('n8n.commands', handleCommandMessageWebhook);
}
}