diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index e7184bdfe0..2be3031ad4 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -1,6 +1,6 @@ import { Container } from 'typedi'; import { Flags, type Config } from '@oclif/core'; -import { sleep } from 'n8n-workflow'; +import { ApplicationError, sleep } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; @@ -102,6 +102,12 @@ export class Webhook extends BaseCommand { } async run() { + if (config.getEnv('multiMainSetup.enabled')) { + throw new ApplicationError( + 'Webhook process cannot be started when multi-main setup is enabled.', + ); + } + await Container.get(Queue).init(); await this.server.start(); this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); diff --git a/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts b/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts index 3be43ae835..5456d1d63d 100644 --- a/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts +++ b/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts @@ -1,6 +1,75 @@ -import { handleCommandMessageMain } from '../main/handleCommandMessageMain'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; +import { License } from '@/License'; +import { MessageEventBus } from '@/eventbus'; +import Container from 'typedi'; +import { Logger } from 'winston'; +import { messageToRedisServiceCommandObject, debounceMessageReceiver } from '../helpers'; +import config from '@/config'; export async function handleCommandMessageWebhook(messageString: string) { - // currently webhooks handle commands the same way as the main instance - return await handleCommandMessageMain(messageString); + const queueModeId = config.getEnv('redis.queueModeId'); + const isMainInstance = config.getEnv('generic.instanceType') === 'main'; + const message = messageToRedisServiceCommandObject(messageString); + const logger = Container.get(Logger); + + if (message) { + logger.debug( + `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, + ); + + if ( + message.senderId === queueModeId || + (message.targets && !message.targets.includes(queueModeId)) + ) { + // Skipping command message because it's not for this instance + logger.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + + switch (message.command) { + case 'reloadLicense': + if (!debounceMessageReceiver(message, 500)) { + message.payload = { + result: 'debounced', + }; + return message; + } + + if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { + // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently + logger.error( + 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', + ); + return message; + } + await Container.get(License).reload(); + break; + case 'restartEventBus': + if (!debounceMessageReceiver(message, 200)) { + message.payload = { + result: 'debounced', + }; + return message; + } + await Container.get(MessageEventBus).restart(); + case 'reloadExternalSecretsProviders': + if (!debounceMessageReceiver(message, 200)) { + message.payload = { + result: 'debounced', + }; + return message; + } + await Container.get(ExternalSecretsManager).reloadAllProviders(); + break; + + default: + break; + } + + return message; + } + + return; }