/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ import { Flags, type Config } from '@oclif/core'; import glob from 'fast-glob'; import { createReadStream, createWriteStream, existsSync } from 'fs'; import { mkdir } from 'fs/promises'; import { jsonParse, randomString, type IWorkflowExecutionDataProcess } from 'n8n-workflow'; import path from 'path'; import replaceStream from 'replacestream'; import { pipeline } from 'stream/promises'; import { Container } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; import config from '@/config'; import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { SettingsRepository } from '@/databases/repositories/settings.repository'; import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; import { License } from '@/license'; import { SingleMainTaskManager } from '@/runners/task-managers/single-main-task-manager'; import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Server } from '@/server'; import { OrchestrationService } from '@/services/orchestration.service'; import { OwnershipService } from '@/services/ownership.service'; import { PruningService } from '@/services/pruning.service'; import { UrlService } from '@/services/url.service'; import { WaitTracker } from '@/wait-tracker'; import { WorkflowRunner } from '@/workflow-runner'; import { BaseCommand } from './base-command'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); export class Start extends BaseCommand { static description = 'Starts n8n. Makes Web-UI available and starts active workflows'; static examples = [ '$ n8n start', '$ n8n start --tunnel', '$ n8n start -o', '$ n8n start --tunnel -o', ]; static flags = { help: Flags.help({ char: 'h' }), open: Flags.boolean({ char: 'o', description: 'opens the UI automatically in browser', }), tunnel: Flags.boolean({ description: 'runs the webhooks via a hooks.n8n.cloud tunnel server. Use only for testing and development!', }), reinstallMissingPackages: Flags.boolean({ description: 'Attempts to self heal n8n if packages with nodes are missing. Might drastically increase startup times.', }), }; protected activeWorkflowManager: ActiveWorkflowManager; protected server = Container.get(Server); override needsCommunityPackages = true; constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); this.setInstanceQueueModeId(); } /** * Opens the UI in browser */ private openBrowser() { const editorUrl = Container.get(UrlService).baseUrl; open(editorUrl, { wait: true }).catch(() => { this.logger.info( `\nWas not able to open URL in browser. Please open manually by visiting:\n${editorUrl}\n`, ); }); } /** * Stop n8n in a graceful way. * Make for example sure that all the webhooks from third party services * get removed. */ async stopProcess() { this.logger.info('\nStopping n8n...'); try { // Stop with trying to activate workflows that could not be activated this.activeWorkflowManager.removeAllQueuedWorkflowActivations(); Container.get(WaitTracker).stopTracking(); await this.externalHooks?.run('n8n.stop', []); await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows(); if (Container.get(OrchestrationService).isMultiMainSetupEnabled) { await Container.get(OrchestrationService).shutdown(); } Container.get(EventService).emit('instance-stopped'); await Container.get(ActiveExecutions).shutdown(); // Finally shut down Event Bus await Container.get(MessageEventBus).close(); } catch (error) { await this.exitWithCrash('There was an error shutting down n8n.', error); } await this.exitSuccessFully(); } private async generateStaticAssets() { // Read the index file and replace the path placeholder const n8nPath = this.globalConfig.path; const hooksUrls = config.getEnv('externalFrontendHooksUrls'); let scriptsString = ''; if (hooksUrls) { scriptsString = hooksUrls.split(';').reduce((acc, curr) => { return `${acc}`; }, ''); } const closingTitleTag = ''; const { staticCacheDir } = this.instanceSettings; const compileFile = async (fileName: string) => { const filePath = path.join(EDITOR_UI_DIST_DIR, fileName); if (/(index\.html)|.*\.(js|css)/.test(filePath) && existsSync(filePath)) { const destFile = path.join(staticCacheDir, fileName); await mkdir(path.dirname(destFile), { recursive: true }); const streams = [ createReadStream(filePath, 'utf-8'), replaceStream('/{{BASE_PATH}}/', n8nPath, { ignoreCase: false }), replaceStream('/%7B%7BBASE_PATH%7D%7D/', n8nPath, { ignoreCase: false }), replaceStream('/%257B%257BBASE_PATH%257D%257D/', n8nPath, { ignoreCase: false }), replaceStream('/static/', n8nPath + 'static/', { ignoreCase: false }), ]; if (filePath.endsWith('index.html')) { streams.push( replaceStream('{{REST_ENDPOINT}}', this.globalConfig.endpoints.rest, { ignoreCase: false, }), replaceStream(closingTitleTag, closingTitleTag + scriptsString, { ignoreCase: false, }), ); } streams.push(createWriteStream(destFile, 'utf-8')); return await pipeline(streams); } }; await compileFile('index.html'); const files = await glob('**/*.{css,js}', { cwd: EDITOR_UI_DIST_DIR }); await Promise.all(files.map(compileFile)); } async init() { await this.initCrashJournal(); this.logger.info('Initializing n8n process'); if (config.getEnv('executions.mode') === 'queue') { this.logger.debug('Main Instance running in queue mode'); this.logger.debug(`Queue mode id: ${this.queueModeId}`); } const { flags } = await this.parse(Start); const { communityPackages } = this.globalConfig.nodes; // cli flag overrides the config env variable if (flags.reinstallMissingPackages) { if (communityPackages.enabled) { this.logger.warn( '`--reinstallMissingPackages` is deprecated: Please use the env variable `N8N_REINSTALL_MISSING_PACKAGES` instead', ); communityPackages.reinstallMissing = true; } else { this.logger.warn( '`--reinstallMissingPackages` was passed, but community packages are disabled', ); } } await super.init(); this.activeWorkflowManager = Container.get(ActiveWorkflowManager); await this.initLicense(); await this.initOrchestration(); this.logger.debug('Orchestration init complete'); if (!config.getEnv('license.autoRenewEnabled') && this.instanceSettings.isLeader) { this.logger.warn( 'Automatic license renewal is disabled. The license will not renew automatically, and access to licensed features may be lost!', ); } Container.get(WaitTracker).init(); this.logger.debug('Wait tracker init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); await this.initDataDeduplicationService(); this.logger.debug('Data deduplication service init complete'); await this.initExternalHooks(); this.logger.debug('External hooks init complete'); await this.initExternalSecrets(); this.logger.debug('External secrets init complete'); this.initWorkflowHistory(); this.logger.debug('Workflow history init complete'); if (!this.globalConfig.endpoints.disableUi) { await this.generateStaticAssets(); } if (!this.globalConfig.taskRunners.disabled) { Container.set(TaskManager, new SingleMainTaskManager()); const { TaskRunnerServer } = await import('@/runners/task-runner-server'); const taskRunnerServer = Container.get(TaskRunnerServer); await taskRunnerServer.start(); const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); const runnerProcess = Container.get(TaskRunnerProcess); await runnerProcess.start(); } } async initOrchestration() { if (config.getEnv('executions.mode') === 'regular') { this.instanceSettings.markAsLeader(); return; } if ( config.getEnv('multiMainSetup.enabled') && !Container.get(License).isMultipleMainInstancesLicensed() ) { throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); } const orchestrationService = Container.get(OrchestrationService); await orchestrationService.init(); Container.get(PubSubHandler).init(); const subscriber = Container.get(Subscriber); await subscriber.subscribe('n8n.commands'); await subscriber.subscribe('n8n.worker-response'); if (!orchestrationService.isMultiMainSetupEnabled) return; orchestrationService.multiMainSetup .on('leader-stepdown', async () => { await this.license.reinit(); // to disable renewal await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows(); }) .on('leader-takeover', async () => { await this.license.reinit(); // to enable renewal await this.activeWorkflowManager.addAllTriggerAndPollerBasedWorkflows(); }); } async run() { const { flags } = await this.parse(Start); // Load settings from database and set them to config. const databaseSettings = await Container.get(SettingsRepository).findBy({ loadOnStartup: true, }); databaseSettings.forEach((setting) => { config.set(setting.key, jsonParse(setting.value, { fallbackValue: setting.value })); }); const { type: dbType } = this.globalConfig.database; if (dbType === 'sqlite') { const shouldRunVacuum = this.globalConfig.database.sqlite.executeVacuumOnStartup; if (shouldRunVacuum) { await Container.get(ExecutionRepository).query('VACUUM;'); } } if (flags.tunnel) { this.log('\nWaiting for tunnel ...'); let tunnelSubdomain = process.env.N8N_TUNNEL_SUBDOMAIN ?? this.instanceSettings.tunnelSubdomain ?? ''; if (tunnelSubdomain === '') { // When no tunnel subdomain did exist yet create a new random one tunnelSubdomain = randomString(24).toLowerCase(); this.instanceSettings.update({ tunnelSubdomain }); } const { default: localtunnel } = await import('@n8n/localtunnel'); const { port } = this.globalConfig; const webhookTunnel = await localtunnel(port, { host: 'https://hooks.n8n.cloud', subdomain: tunnelSubdomain, }); process.env.WEBHOOK_URL = `${webhookTunnel.url}/`; this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`); this.log( 'IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!', ); } await this.server.start(); Container.get(PruningService).init(); if (config.getEnv('executions.mode') === 'regular') { await this.runEnqueuedExecutions(); } // Start to get active workflows and run their triggers await this.activeWorkflowManager.init(); const editorUrl = Container.get(UrlService).baseUrl; this.log(`\nEditor is now accessible via:\n${editorUrl}`); // Allow to open n8n editor by pressing "o" if (Boolean(process.stdout.isTTY) && process.stdin.setRawMode) { process.stdin.setRawMode(true); process.stdin.resume(); process.stdin.setEncoding('utf8'); if (flags.open) { this.openBrowser(); } this.log('\nPress "o" to open in Browser.'); process.stdin.on('data', (key: string) => { if (key === 'o') { this.openBrowser(); } else if (key.charCodeAt(0) === 3) { // Ctrl + c got pressed void this.onTerminationSignal('SIGINT')(); } else { // When anything else got pressed, record it and send it on enter into the child process if (key.charCodeAt(0) === 13) { // send to child process and print in terminal process.stdout.write('\n'); } else { // record it and write into terminal process.stdout.write(key); } } }); } } async catch(error: Error) { if (error.stack) this.logger.error(error.stack); await this.exitWithCrash('Exiting due to an error.', error); } /** * During startup, we may find executions that had been enqueued at the time of shutdown. * * If so, start running any such executions concurrently up to the concurrency limit, and * enqueue any remaining ones until we have spare concurrency capacity again. */ private async runEnqueuedExecutions() { const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions(); if (executions.length === 0) return; this.logger.debug('[Startup] Found enqueued executions to run', { executionIds: executions.map((e) => e.id), }); const ownershipService = Container.get(OwnershipService); const workflowRunner = Container.get(WorkflowRunner); for (const execution of executions) { const project = await ownershipService.getWorkflowProjectCached(execution.workflowId); const data: IWorkflowExecutionDataProcess = { executionMode: execution.mode, executionData: execution.data, workflowData: execution.workflowData, projectId: project.id, }; Container.get(EventService).emit('execution-started-during-bootup', { executionId: execution.id, }); // do not block - each execution either runs concurrently or is queued void workflowRunner.run(data, undefined, false, execution.id); } } }