From 1ded08bf7e07241f4f6eab3d584687449ec21a31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 8 Oct 2024 11:18:12 +0200 Subject: [PATCH] refactor(core): Include self-sending and debouncing in pubsub commands (#11149) --- .../__tests__/publisher.service.test.ts | 2 +- packages/cli/src/scaling/constants.ts | 14 ++++++++++++++ .../src/scaling/pubsub/publisher.service.ts | 8 +++++++- .../cli/src/scaling/pubsub/pubsub.types.ts | 6 ++++++ .../src/scaling/pubsub/subscriber.service.ts | 18 ++++++++++++------ .../main/handle-command-message-main.ts | 8 +------- 6 files changed, 41 insertions(+), 15 deletions(-) diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index f26c93cc42..439af01ef9 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -52,7 +52,7 @@ describe('Publisher', () => { expect(client.publish).toHaveBeenCalledWith( 'n8n.commands', - JSON.stringify({ ...msg, senderId: queueModeId }), + JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }), ); }); }); diff --git a/packages/cli/src/scaling/constants.ts b/packages/cli/src/scaling/constants.ts index f1e55d7ab1..348f156896 100644 --- a/packages/cli/src/scaling/constants.ts +++ b/packages/cli/src/scaling/constants.ts @@ -7,3 +7,17 @@ export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands'; /** Pubsub channel for messages sent by workers in response to commands from main processes. */ export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response'; + +/** + * Commands that should be sent to the sender as well, e.g. during workflow activation and + * deactivation in multi-main setup. */ +export const SELF_SEND_COMMANDS = new Set([ + 'add-webhooks-triggers-and-pollers', + 'remove-triggers-and-pollers', +]); + +/** + * Commands that should not be debounced when received, e.g. during webhook handling in + * multi-main setup. + */ +export const IMMEDIATE_COMMANDS = new Set(['relay-execution-lifecycle-event']); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index 7a35b94c3e..bfcede6542 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -6,6 +6,7 @@ import { Logger } from '@/logging/logger.service'; import { RedisClientService } from '@/services/redis-client.service'; import type { PubSub } from './pubsub.types'; +import { IMMEDIATE_COMMANDS, SELF_SEND_COMMANDS } from '../constants'; /** * Responsible for publishing messages into the pubsub channels used by scaling mode. @@ -43,7 +44,12 @@ export class Publisher { async publishCommand(msg: Omit) { await this.client.publish( 'n8n.commands', - JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }), + JSON.stringify({ + ...msg, + senderId: config.getEnv('redis.queueModeId'), + selfSend: SELF_SEND_COMMANDS.has(msg.command), + debounce: !IMMEDIATE_COMMANDS.has(msg.command), + }), ); this.logger.debug(`Published ${msg.command} to command channel`); diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index ac83659212..be38cc98f8 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -22,6 +22,12 @@ export namespace PubSub { senderId: string; targets?: string[]; command: CommandKey; + + /** Whether the command should be sent to the sender as well. */ + selfSend?: boolean; + + /** Whether the command should be debounced when received. */ + debounce?: boolean; } & (PubSubCommandMap[CommandKey] extends never ? { payload?: never } // some commands carry no payload : { payload: PubSubCommandMap[CommandKey] }); diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index f9a2567f8d..7586b52ebc 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -70,12 +70,15 @@ export class Subscriber { // #region Commands setCommandMessageHandler() { - const handlerFn = debounce((str: string) => { - const msg = this.parseCommandMessage(str); - if (msg) this.eventService.emit(msg.command, msg.payload); - }, 300); + const handlerFn = (msg: PubSub.Command) => this.eventService.emit(msg.command, msg.payload); + const debouncedHandlerFn = debounce(handlerFn, 300); - this.setMessageHandler('n8n.commands', handlerFn); + this.setMessageHandler('n8n.commands', (str: string) => { + const msg = this.parseCommandMessage(str); + if (!msg) return; + if (msg.debounce) debouncedHandlerFn(msg); + else handlerFn(msg); + }); } private parseCommandMessage(str: string) { @@ -91,7 +94,10 @@ export class Subscriber { const queueModeId = config.getEnv('redis.queueModeId'); - if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) { + if ( + !msg.selfSend && + (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) + ) { this.logger.debug('Disregarding message - not for this instance', msg); return null; diff --git a/packages/cli/src/services/orchestration/main/handle-command-message-main.ts b/packages/cli/src/services/orchestration/main/handle-command-message-main.ts index 909f5976a5..7af2fa6f56 100644 --- a/packages/cli/src/services/orchestration/main/handle-command-message-main.ts +++ b/packages/cli/src/services/orchestration/main/handle-command-message-main.ts @@ -27,17 +27,11 @@ export async function handleCommandMessageMain(messageString: string) { `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, ); - const selfSendingAllowed = [ - 'add-webhooks-triggers-and-pollers', - 'remove-triggers-and-pollers', - ].includes(message.command); - if ( - !selfSendingAllowed && + !message.selfSend && (message.senderId === queueModeId || (message.targets && !message.targets.includes(queueModeId))) ) { - // Skipping command message because it's not for this instance logger.debug( `Skipping command message ${message.command} because it's not for this instance.`, );