import { Flags, type Config } from '@oclif/core'; import { ApplicationError } from 'n8n-workflow'; import { Container } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; import { WebhookServer } from '@/webhooks/webhook-server'; import { BaseCommand } from './base-command'; export class Webhook extends BaseCommand { static description = 'Starts n8n webhook process. Intercepts only production URLs.'; static examples = ['$ n8n webhook']; static flags = { help: Flags.help({ char: 'h' }), }; protected server = Container.get(WebhookServer); override needsCommunityPackages = true; constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); if (this.queueModeId) { this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`); } this.setInstanceQueueModeId(); } /** * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services * get removed. */ async stopProcess() { this.logger.info('\nStopping n8n...'); try { await this.externalHooks?.run('n8n.stop', []); await Container.get(ActiveExecutions).shutdown(); } catch (error) { await this.exitWithCrash('There was an error shutting down n8n.', error); } await this.exitSuccessFully(); } async init() { if (config.getEnv('executions.mode') !== 'queue') { /** * It is technically possible to run without queues but * there are 2 known bugs when running in this mode: * - Executions list will be problematic as the main process * is not aware of current executions in the webhook processes * and therefore will display all current executions as error * as it is unable to determine if it is still running or crashed * - You cannot stop currently executing jobs from webhook processes * when running without queues as the main process cannot talk to * the webhook processes to communicate workflow execution interruption. */ this.error('Webhook processes can only run with execution mode as queue.'); } await this.initCrashJournal(); this.logger.debug('Crash journal initialized'); this.logger.info('Initializing n8n webhook process'); this.logger.debug(`Queue mode id: ${this.queueModeId}`); await super.init(); await this.initLicense(); this.logger.debug('License init complete'); await this.initOrchestration(); this.logger.debug('Orchestration init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); await this.initDataDeduplicationService(); this.logger.debug('Data deduplication service init complete'); await this.initExternalHooks(); this.logger.debug('External hooks init complete'); await this.initExternalSecrets(); this.logger.debug('External secrets init complete'); } async run() { if (config.getEnv('multiMainSetup.enabled')) { throw new ApplicationError( 'Webhook process cannot be started when multi-main setup is enabled.', ); } const { ScalingService } = await import('@/scaling/scaling.service'); await Container.get(ScalingService).setupQueue(); await this.server.start(); this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); this.logger.info('Webhook listener waiting for requests.'); // Make sure that the process does not close await new Promise(() => {}); } async catch(error: Error) { await this.exitWithCrash('Exiting due to an error.', error); } async initOrchestration() { await Container.get(OrchestrationWebhookService).init(); Container.get(PubSubHandler).init(); const subscriber = Container.get(Subscriber); await subscriber.subscribe('n8n.commands'); subscriber.setCommandMessageHandler(); } }