mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 20:24:05 -08:00
refactor(core): Include self-sending and debouncing in pubsub commands (#11149)
This commit is contained in:
parent
1ca14946d9
commit
1ded08bf7e
|
@ -52,7 +52,7 @@ describe('Publisher', () => {
|
||||||
|
|
||||||
expect(client.publish).toHaveBeenCalledWith(
|
expect(client.publish).toHaveBeenCalledWith(
|
||||||
'n8n.commands',
|
'n8n.commands',
|
||||||
JSON.stringify({ ...msg, senderId: queueModeId }),
|
JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -7,3 +7,17 @@ export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands';
|
||||||
|
|
||||||
/** Pubsub channel for messages sent by workers in response to commands from main processes. */
|
/** Pubsub channel for messages sent by workers in response to commands from main processes. */
|
||||||
export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response';
|
export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commands that should be sent to the sender as well, e.g. during workflow activation and
|
||||||
|
* deactivation in multi-main setup. */
|
||||||
|
export const SELF_SEND_COMMANDS = new Set([
|
||||||
|
'add-webhooks-triggers-and-pollers',
|
||||||
|
'remove-triggers-and-pollers',
|
||||||
|
]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commands that should not be debounced when received, e.g. during webhook handling in
|
||||||
|
* multi-main setup.
|
||||||
|
*/
|
||||||
|
export const IMMEDIATE_COMMANDS = new Set(['relay-execution-lifecycle-event']);
|
||||||
|
|
|
@ -6,6 +6,7 @@ import { Logger } from '@/logging/logger.service';
|
||||||
import { RedisClientService } from '@/services/redis-client.service';
|
import { RedisClientService } from '@/services/redis-client.service';
|
||||||
|
|
||||||
import type { PubSub } from './pubsub.types';
|
import type { PubSub } from './pubsub.types';
|
||||||
|
import { IMMEDIATE_COMMANDS, SELF_SEND_COMMANDS } from '../constants';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responsible for publishing messages into the pubsub channels used by scaling mode.
|
* Responsible for publishing messages into the pubsub channels used by scaling mode.
|
||||||
|
@ -43,7 +44,12 @@ export class Publisher {
|
||||||
async publishCommand(msg: Omit<PubSub.Command, 'senderId'>) {
|
async publishCommand(msg: Omit<PubSub.Command, 'senderId'>) {
|
||||||
await this.client.publish(
|
await this.client.publish(
|
||||||
'n8n.commands',
|
'n8n.commands',
|
||||||
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
|
JSON.stringify({
|
||||||
|
...msg,
|
||||||
|
senderId: config.getEnv('redis.queueModeId'),
|
||||||
|
selfSend: SELF_SEND_COMMANDS.has(msg.command),
|
||||||
|
debounce: !IMMEDIATE_COMMANDS.has(msg.command),
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.debug(`Published ${msg.command} to command channel`);
|
this.logger.debug(`Published ${msg.command} to command channel`);
|
||||||
|
|
|
@ -22,6 +22,12 @@ export namespace PubSub {
|
||||||
senderId: string;
|
senderId: string;
|
||||||
targets?: string[];
|
targets?: string[];
|
||||||
command: CommandKey;
|
command: CommandKey;
|
||||||
|
|
||||||
|
/** Whether the command should be sent to the sender as well. */
|
||||||
|
selfSend?: boolean;
|
||||||
|
|
||||||
|
/** Whether the command should be debounced when received. */
|
||||||
|
debounce?: boolean;
|
||||||
} & (PubSubCommandMap[CommandKey] extends never
|
} & (PubSubCommandMap[CommandKey] extends never
|
||||||
? { payload?: never } // some commands carry no payload
|
? { payload?: never } // some commands carry no payload
|
||||||
: { payload: PubSubCommandMap[CommandKey] });
|
: { payload: PubSubCommandMap[CommandKey] });
|
||||||
|
|
|
@ -70,12 +70,15 @@ export class Subscriber {
|
||||||
// #region Commands
|
// #region Commands
|
||||||
|
|
||||||
setCommandMessageHandler() {
|
setCommandMessageHandler() {
|
||||||
const handlerFn = debounce((str: string) => {
|
const handlerFn = (msg: PubSub.Command) => this.eventService.emit(msg.command, msg.payload);
|
||||||
const msg = this.parseCommandMessage(str);
|
const debouncedHandlerFn = debounce(handlerFn, 300);
|
||||||
if (msg) this.eventService.emit(msg.command, msg.payload);
|
|
||||||
}, 300);
|
|
||||||
|
|
||||||
this.setMessageHandler('n8n.commands', handlerFn);
|
this.setMessageHandler('n8n.commands', (str: string) => {
|
||||||
|
const msg = this.parseCommandMessage(str);
|
||||||
|
if (!msg) return;
|
||||||
|
if (msg.debounce) debouncedHandlerFn(msg);
|
||||||
|
else handlerFn(msg);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private parseCommandMessage(str: string) {
|
private parseCommandMessage(str: string) {
|
||||||
|
@ -91,7 +94,10 @@ export class Subscriber {
|
||||||
|
|
||||||
const queueModeId = config.getEnv('redis.queueModeId');
|
const queueModeId = config.getEnv('redis.queueModeId');
|
||||||
|
|
||||||
if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) {
|
if (
|
||||||
|
!msg.selfSend &&
|
||||||
|
(msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId)))
|
||||||
|
) {
|
||||||
this.logger.debug('Disregarding message - not for this instance', msg);
|
this.logger.debug('Disregarding message - not for this instance', msg);
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -27,17 +27,11 @@ export async function handleCommandMessageMain(messageString: string) {
|
||||||
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
|
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
const selfSendingAllowed = [
|
|
||||||
'add-webhooks-triggers-and-pollers',
|
|
||||||
'remove-triggers-and-pollers',
|
|
||||||
].includes(message.command);
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
!selfSendingAllowed &&
|
!message.selfSend &&
|
||||||
(message.senderId === queueModeId ||
|
(message.senderId === queueModeId ||
|
||||||
(message.targets && !message.targets.includes(queueModeId)))
|
(message.targets && !message.targets.includes(queueModeId)))
|
||||||
) {
|
) {
|
||||||
// Skipping command message because it's not for this instance
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
`Skipping command message ${message.command} because it's not for this instance.`,
|
`Skipping command message ${message.command} because it's not for this instance.`,
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in a new issue