n8n/packages/cli/src/commands/start.ts

413 lines
14 KiB
TypeScript

/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Flags, type Config } from '@oclif/core';
import glob from 'fast-glob';
import { createReadStream, createWriteStream, existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { jsonParse, randomString, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import path from 'path';
import replaceStream from 'replacestream';
import { pipeline } from 'stream/promises';
import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import config from '@/config';
import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { SettingsRepository } from '@/databases/repositories/settings.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { Server } from '@/server';
import { OrchestrationService } from '@/services/orchestration.service';
import { OwnershipService } from '@/services/ownership.service';
import { PruningService } from '@/services/pruning.service';
import { UrlService } from '@/services/url.service';
import { WaitTracker } from '@/wait-tracker';
import { WorkflowRunner } from '@/workflow-runner';
import { BaseCommand } from './base-command';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
export class Start extends BaseCommand {
static description = 'Starts n8n. Makes Web-UI available and starts active workflows';
static examples = [
'$ n8n start',
'$ n8n start --tunnel',
'$ n8n start -o',
'$ n8n start --tunnel -o',
];
static flags = {
help: Flags.help({ char: 'h' }),
open: Flags.boolean({
char: 'o',
description: 'opens the UI automatically in browser',
}),
tunnel: Flags.boolean({
description:
'runs the webhooks via a hooks.n8n.cloud tunnel server. Use only for testing and development!',
}),
reinstallMissingPackages: Flags.boolean({
description:
'Attempts to self heal n8n if packages with nodes are missing. Might drastically increase startup times.',
}),
};
protected activeWorkflowManager: ActiveWorkflowManager;
protected server = Container.get(Server);
override needsCommunityPackages = true;
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.setInstanceQueueModeId();
}
/**
* Opens the UI in browser
*/
private openBrowser() {
const editorUrl = Container.get(UrlService).baseUrl;
open(editorUrl, { wait: true }).catch(() => {
this.logger.info(
`\nWas not able to open URL in browser. Please open manually by visiting:\n${editorUrl}\n`,
);
});
}
/**
* Stop n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
* get removed.
*/
async stopProcess() {
this.logger.info('\nStopping n8n...');
try {
// Stop with trying to activate workflows that could not be activated
this.activeWorkflowManager.removeAllQueuedWorkflowActivations();
Container.get(WaitTracker).stopTracking();
await this.externalHooks?.run('n8n.stop', []);
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
if (Container.get(OrchestrationService).isMultiMainSetupEnabled) {
await Container.get(OrchestrationService).shutdown();
}
Container.get(EventService).emit('instance-stopped');
await Container.get(ActiveExecutions).shutdown();
// Finally shut down Event Bus
await Container.get(MessageEventBus).close();
} catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error);
}
await this.exitSuccessFully();
}
private async generateStaticAssets() {
// Read the index file and replace the path placeholder
const n8nPath = this.globalConfig.path;
const hooksUrls = config.getEnv('externalFrontendHooksUrls');
let scriptsString = '';
if (hooksUrls) {
scriptsString = hooksUrls.split(';').reduce((acc, curr) => {
return `${acc}<script src="${curr}"></script>`;
}, '');
}
const closingTitleTag = '</title>';
const { staticCacheDir } = this.instanceSettings;
const compileFile = async (fileName: string) => {
const filePath = path.join(EDITOR_UI_DIST_DIR, fileName);
if (/(index\.html)|.*\.(js|css)/.test(filePath) && existsSync(filePath)) {
const destFile = path.join(staticCacheDir, fileName);
await mkdir(path.dirname(destFile), { recursive: true });
const streams = [
createReadStream(filePath, 'utf-8'),
replaceStream('/{{BASE_PATH}}/', n8nPath, { ignoreCase: false }),
replaceStream('/%7B%7BBASE_PATH%7D%7D/', n8nPath, { ignoreCase: false }),
replaceStream('/%257B%257BBASE_PATH%257D%257D/', n8nPath, { ignoreCase: false }),
replaceStream('/static/', n8nPath + 'static/', { ignoreCase: false }),
];
if (filePath.endsWith('index.html')) {
streams.push(
replaceStream('{{REST_ENDPOINT}}', this.globalConfig.endpoints.rest, {
ignoreCase: false,
}),
replaceStream(closingTitleTag, closingTitleTag + scriptsString, {
ignoreCase: false,
}),
);
}
streams.push(createWriteStream(destFile, 'utf-8'));
return await pipeline(streams);
}
};
await compileFile('index.html');
const files = await glob('**/*.{css,js}', { cwd: EDITOR_UI_DIST_DIR });
await Promise.all(files.map(compileFile));
}
async init() {
await this.initCrashJournal();
this.logger.info('Initializing n8n process');
if (config.getEnv('executions.mode') === 'queue') {
const scopedLogger = this.logger.withScope('scaling');
scopedLogger.debug('Starting main instance in scaling mode');
scopedLogger.debug(`Host ID: ${this.queueModeId}`);
}
const { flags } = await this.parse(Start);
const { communityPackages } = this.globalConfig.nodes;
// cli flag overrides the config env variable
if (flags.reinstallMissingPackages) {
if (communityPackages.enabled) {
this.logger.warn(
'`--reinstallMissingPackages` is deprecated: Please use the env variable `N8N_REINSTALL_MISSING_PACKAGES` instead',
);
communityPackages.reinstallMissing = true;
} else {
this.logger.warn(
'`--reinstallMissingPackages` was passed, but community packages are disabled',
);
}
}
await super.init();
this.activeWorkflowManager = Container.get(ActiveWorkflowManager);
await this.initLicense();
await this.initOrchestration();
this.logger.debug('Orchestration init complete');
if (!config.getEnv('license.autoRenewEnabled') && this.instanceSettings.isLeader) {
this.logger.warn(
'Automatic license renewal is disabled. The license will not renew automatically, and access to licensed features may be lost!',
);
}
Container.get(WaitTracker).init();
this.logger.debug('Wait tracker init complete');
await this.initBinaryDataService();
this.logger.debug('Binary data service init complete');
await this.initDataDeduplicationService();
this.logger.debug('Data deduplication service init complete');
await this.initExternalHooks();
this.logger.debug('External hooks init complete');
await this.initExternalSecrets();
this.logger.debug('External secrets init complete');
this.initWorkflowHistory();
this.logger.debug('Workflow history init complete');
if (!this.globalConfig.endpoints.disableUi) {
await this.generateStaticAssets();
}
if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();
}
}
async initOrchestration() {
if (config.getEnv('executions.mode') === 'regular') {
this.instanceSettings.markAsLeader();
return;
}
if (
config.getEnv('multiMainSetup.enabled') &&
!Container.get(License).isMultipleMainInstancesLicensed()
) {
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
}
const orchestrationService = Container.get(OrchestrationService);
await orchestrationService.init();
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
await subscriber.subscribe('n8n.worker-response');
this.logger.withScope('scaling').debug('Pubsub setup completed');
if (!orchestrationService.isMultiMainSetupEnabled) return;
orchestrationService.multiMainSetup
.on('leader-stepdown', async () => {
await this.license.reinit(); // to disable renewal
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
})
.on('leader-takeover', async () => {
await this.license.reinit(); // to enable renewal
await this.activeWorkflowManager.addAllTriggerAndPollerBasedWorkflows();
});
}
async run() {
const { flags } = await this.parse(Start);
// Load settings from database and set them to config.
const databaseSettings = await Container.get(SettingsRepository).findBy({
loadOnStartup: true,
});
databaseSettings.forEach((setting) => {
config.set(setting.key, jsonParse(setting.value, { fallbackValue: setting.value }));
});
const { type: dbType } = this.globalConfig.database;
if (dbType === 'sqlite') {
const shouldRunVacuum = this.globalConfig.database.sqlite.executeVacuumOnStartup;
if (shouldRunVacuum) {
await Container.get(ExecutionRepository).query('VACUUM;');
}
}
if (flags.tunnel) {
this.log('\nWaiting for tunnel ...');
let tunnelSubdomain =
process.env.N8N_TUNNEL_SUBDOMAIN ?? this.instanceSettings.tunnelSubdomain ?? '';
if (tunnelSubdomain === '') {
// When no tunnel subdomain did exist yet create a new random one
tunnelSubdomain = randomString(24).toLowerCase();
this.instanceSettings.update({ tunnelSubdomain });
}
const { default: localtunnel } = await import('@n8n/localtunnel');
const { port } = this.globalConfig;
const webhookTunnel = await localtunnel(port, {
host: 'https://hooks.n8n.cloud',
subdomain: tunnelSubdomain,
});
process.env.WEBHOOK_URL = `${webhookTunnel.url}/`;
this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`);
this.log(
'IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!',
);
}
await this.server.start();
Container.get(PruningService).init();
if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
}
// Start to get active workflows and run their triggers
await this.activeWorkflowManager.init();
const editorUrl = Container.get(UrlService).baseUrl;
this.log(`\nEditor is now accessible via:\n${editorUrl}`);
// Allow to open n8n editor by pressing "o"
if (Boolean(process.stdout.isTTY) && process.stdin.setRawMode) {
process.stdin.setRawMode(true);
process.stdin.resume();
process.stdin.setEncoding('utf8');
if (flags.open) {
this.openBrowser();
}
this.log('\nPress "o" to open in Browser.');
process.stdin.on('data', (key: string) => {
if (key === 'o') {
this.openBrowser();
} else if (key.charCodeAt(0) === 3) {
// Ctrl + c got pressed
void this.onTerminationSignal('SIGINT')();
} else {
// When anything else got pressed, record it and send it on enter into the child process
if (key.charCodeAt(0) === 13) {
// send to child process and print in terminal
process.stdout.write('\n');
} else {
// record it and write into terminal
process.stdout.write(key);
}
}
});
}
}
async catch(error: Error) {
if (error.stack) this.logger.error(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);
}
/**
* During startup, we may find executions that had been enqueued at the time of shutdown.
*
* If so, start running any such executions concurrently up to the concurrency limit, and
* enqueue any remaining ones until we have spare concurrency capacity again.
*/
private async runEnqueuedExecutions() {
const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions();
if (executions.length === 0) return;
this.logger.debug('[Startup] Found enqueued executions to run', {
executionIds: executions.map((e) => e.id),
});
const ownershipService = Container.get(OwnershipService);
const workflowRunner = Container.get(WorkflowRunner);
for (const execution of executions) {
const project = await ownershipService.getWorkflowProjectCached(execution.workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
};
Container.get(EventService).emit('execution-started-during-bootup', {
executionId: execution.id,
});
// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
}
}