diff --git a/docker/images/n8n-custom/Dockerfile b/docker/images/n8n-custom/Dockerfile index a533cdbdab..78eedaa2c3 100644 --- a/docker/images/n8n-custom/Dockerfile +++ b/docker/images/n8n-custom/Dockerfile @@ -34,7 +34,7 @@ COPY docker/images/n8n/docker-entrypoint.sh / # Setup the Task Runner Launcher ARG TARGETPLATFORM ARG LAUNCHER_VERSION=0.1.1 -ENV N8N_RUNNERS_USE_LAUNCHER=true \ +ENV N8N_RUNNERS_MODE=internal_launcher \ N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json # First, download, verify, then extract the launcher binary diff --git a/docker/images/n8n/Dockerfile b/docker/images/n8n/Dockerfile index 08e031cf5f..8a94d0c9ec 100644 --- a/docker/images/n8n/Dockerfile +++ b/docker/images/n8n/Dockerfile @@ -25,7 +25,7 @@ RUN set -eux; \ # Setup the Task Runner Launcher ARG TARGETPLATFORM ARG LAUNCHER_VERSION=0.1.1 -ENV N8N_RUNNERS_USE_LAUNCHER=true \ +ENV N8N_RUNNERS_MODE=internal_launcher \ N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher COPY n8n-task-runners.json /etc/n8n-task-runners.json # First, download, verify, then extract the launcher binary diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 14d1b01d1a..9c88c77c6a 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -1,11 +1,23 @@ import { Config, Env } from '../decorators'; +/** + * Whether to enable task runners and how to run them + * - internal_childprocess: Task runners are run as a child process and launched by n8n + * - internal_launcher: Task runners are run as a child process and launched by n8n using a separate launch program + * - external: Task runners are run as a separate program not launched by n8n + */ +export type TaskRunnerMode = 'internal_childprocess' | 'internal_launcher' | 'external'; + @Config export class TaskRunnersConfig { // Defaults to true for now @Env('N8N_RUNNERS_DISABLED') disabled: boolean = true; + // Defaults to true for now + @Env('N8N_RUNNERS_MODE') + mode: TaskRunnerMode = 'internal_childprocess'; + @Env('N8N_RUNNERS_PATH') path: string = '/runners'; @@ -18,10 +30,7 @@ export class TaskRunnersConfig { /** IP address task runners server should listen on */ @Env('N8N_RUNNERS_SERVER_LISTEN_ADDRESS') - listen_address: string = '127.0.0.1'; - - @Env('N8N_RUNNERS_USE_LAUNCHER') - useLauncher: boolean = false; + listenAddress: string = '127.0.0.1'; @Env('N8N_RUNNERS_LAUNCHER_PATH') launcherPath: string = ''; diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index af40e7a8e1..dc85697076 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -223,11 +223,11 @@ describe('GlobalConfig', () => { }, taskRunners: { disabled: true, + mode: 'internal_childprocess', path: '/runners', authToken: '', - listen_address: '127.0.0.1', + listenAddress: '127.0.0.1', port: 5679, - useLauncher: false, launcherPath: '', launcherRunner: 'javascript', }, diff --git a/packages/@n8n/task-runner/src/authenticator.ts b/packages/@n8n/task-runner/src/authenticator.ts deleted file mode 100644 index 7edb4cadf6..0000000000 --- a/packages/@n8n/task-runner/src/authenticator.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { ApplicationError } from 'n8n-workflow'; -import * as a from 'node:assert/strict'; - -export type AuthOpts = { - n8nUri: string; - authToken: string; -}; - -/** - * Requests a one-time token that can be used to establish a task runner connection - */ -export async function authenticate(opts: AuthOpts) { - try { - const authEndpoint = `http://${opts.n8nUri}/runners/auth`; - const response = await fetch(authEndpoint, { - method: 'POST', - headers: { - // eslint-disable-next-line @typescript-eslint/naming-convention - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - token: opts.authToken, - }), - }); - - if (!response.ok) { - throw new ApplicationError( - `Invalid response status ${response.status}: ${await response.text()}`, - ); - } - - const { data } = (await response.json()) as { data: { token: string } }; - const grantToken = data.token; - a.ok(grantToken); - - return grantToken; - } catch (e) { - console.error(e); - const error = e as Error; - throw new ApplicationError( - `Could not connect to n8n message broker ${opts.n8nUri}: ${error.message}`, - { - cause: error, - }, - ); - } -} diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index 5f856140d9..f5487ba6c2 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -1,7 +1,5 @@ import { ApplicationError, ensureError } from 'n8n-workflow'; -import * as a from 'node:assert/strict'; -import { authenticate } from './authenticator'; import { JsTaskRunner } from './js-task-runner/js-task-runner'; let runner: JsTaskRunner | undefined; @@ -9,22 +7,17 @@ let isShuttingDown = false; type Config = { n8nUri: string; - authToken?: string; - grantToken?: string; + grantToken: string; }; function readAndParseConfig(): Config { - const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN; const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN; - if (!authToken && !grantToken) { - throw new ApplicationError( - 'Missing task runner authentication. Use either N8N_RUNNERS_AUTH_TOKEN or N8N_RUNNERS_GRANT_TOKEN to configure it', - ); + if (!grantToken) { + throw new ApplicationError('Missing N8N_RUNNERS_GRANT_TOKEN environment variable'); } return { n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? '127.0.0.1:5679', - authToken, grantToken, }; } @@ -55,20 +48,10 @@ function createSignalHandler(signal: string) { void (async function start() { const config = readAndParseConfig(); - let grantToken = config.grantToken; - if (!grantToken) { - a.ok(config.authToken); - - grantToken = await authenticate({ - authToken: config.authToken, - n8nUri: config.n8nUri, - }); - } - const wsUrl = `ws://${config.n8nUri}/runners/_ws`; runner = new JsTaskRunner({ wsUrl, - grantToken, + grantToken: config.grantToken, maxConcurrency: 5, allowedBuiltInModules: process.env.NODE_FUNCTION_ALLOW_BUILTIN, allowedExternalModules: process.env.NODE_FUNCTION_ALLOW_EXTERNAL, diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 7865739eec..c8428c1dc9 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -222,15 +222,21 @@ export class Start extends BaseCommand { await this.generateStaticAssets(); } - if (!this.globalConfig.taskRunners.disabled) { + const { taskRunners: taskRunnerConfig } = this.globalConfig; + if (!taskRunnerConfig.disabled) { Container.set(TaskManager, new LocalTaskManager()); const { TaskRunnerServer } = await import('@/runners/task-runner-server'); const taskRunnerServer = Container.get(TaskRunnerServer); await taskRunnerServer.start(); - const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); - const runnerProcess = Container.get(TaskRunnerProcess); - await runnerProcess.start(); + if ( + taskRunnerConfig.mode === 'internal_childprocess' || + taskRunnerConfig.mode === 'internal_launcher' + ) { + const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); + const runnerProcess = Container.get(TaskRunnerProcess); + await runnerProcess.start(); + } } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 96f151f547..d362491bff 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -114,15 +114,21 @@ export class Worker extends BaseCommand { }), ); - if (!this.globalConfig.taskRunners.disabled) { + const { taskRunners: taskRunnerConfig } = this.globalConfig; + if (!taskRunnerConfig.disabled) { Container.set(TaskManager, new LocalTaskManager()); const { TaskRunnerServer } = await import('@/runners/task-runner-server'); const taskRunnerServer = Container.get(TaskRunnerServer); await taskRunnerServer.start(); - const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); - const runnerProcess = Container.get(TaskRunnerProcess); - await runnerProcess.start(); + if ( + taskRunnerConfig.mode === 'internal_childprocess' || + taskRunnerConfig.mode === 'internal_launcher' + ) { + const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); + const runnerProcess = Container.get(TaskRunnerProcess); + await runnerProcess.start(); + } } } diff --git a/packages/cli/src/runners/__tests__/task-runner-process.test.ts b/packages/cli/src/runners/__tests__/task-runner-process.test.ts index b2ad678ee1..1bae991811 100644 --- a/packages/cli/src/runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-process.test.ts @@ -1,4 +1,4 @@ -import { GlobalConfig } from '@n8n/config'; +import { TaskRunnersConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import type { ChildProcess, SpawnOptions } from 'node:child_process'; @@ -19,14 +19,26 @@ const spawnMock = jest.fn(() => require('child_process').spawn = spawnMock; describe('TaskRunnerProcess', () => { - const globalConfig = mockInstance(GlobalConfig); + const runnerConfig = mockInstance(TaskRunnersConfig); + runnerConfig.disabled = false; + runnerConfig.mode = 'internal_childprocess'; const authService = mock(); - const taskRunnerProcess = new TaskRunnerProcess(globalConfig, authService); + const taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService); afterEach(async () => { spawnMock.mockClear(); }); + describe('constructor', () => { + it('should throw if runner mode is external', () => { + runnerConfig.mode = 'external'; + + expect(() => new TaskRunnerProcess(runnerConfig, authService)).toThrow(); + + runnerConfig.mode = 'internal_childprocess'; + }); + }); + describe('start', () => { it('should propagate NODE_FUNCTION_ALLOW_BUILTIN and NODE_FUNCTION_ALLOW_EXTERNAL from env', async () => { jest.spyOn(authService, 'createGrantToken').mockResolvedValue('grantToken'); diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index a3bc118387..1b059bd573 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -1,4 +1,4 @@ -import { GlobalConfig } from '@n8n/config'; +import { TaskRunnersConfig } from '@n8n/config'; import * as a from 'node:assert/strict'; import { spawn } from 'node:child_process'; import * as process from 'node:process'; @@ -28,6 +28,10 @@ export class TaskRunnerProcess { return this._runPromise; } + private get useLauncher() { + return this.runnerConfig.mode === 'internal_launcher'; + } + private process: ChildProcess | null = null; private _runPromise: Promise | null = null; @@ -35,17 +39,22 @@ export class TaskRunnerProcess { private isShuttingDown = false; constructor( - private readonly globalConfig: GlobalConfig, + private readonly runnerConfig: TaskRunnersConfig, private readonly authService: TaskRunnerAuthService, - ) {} + ) { + a.ok( + this.runnerConfig.mode === 'internal_childprocess' || + this.runnerConfig.mode === 'internal_launcher', + ); + } async start() { a.ok(!this.process, 'Task Runner Process already running'); const grantToken = await this.authService.createGrantToken(); - const n8nUri = `127.0.0.1:${this.globalConfig.taskRunners.port}`; - this.process = this.globalConfig.taskRunners.useLauncher + const n8nUri = `127.0.0.1:${this.runnerConfig.port}`; + this.process = this.useLauncher ? this.startLauncher(grantToken, n8nUri) : this.startNode(grantToken, n8nUri); @@ -70,21 +79,17 @@ export class TaskRunnerProcess { } startLauncher(grantToken: string, n8nUri: string) { - return spawn( - this.globalConfig.taskRunners.launcherPath, - ['launch', this.globalConfig.taskRunners.launcherRunner], - { - env: { - PATH: process.env.PATH, - N8N_RUNNERS_GRANT_TOKEN: grantToken, - N8N_RUNNERS_N8N_URI: n8nUri, - NODE_FUNCTION_ALLOW_BUILTIN: process.env.NODE_FUNCTION_ALLOW_BUILTIN, - NODE_FUNCTION_ALLOW_EXTERNAL: process.env.NODE_FUNCTION_ALLOW_EXTERNAL, - // For debug logging if enabled - RUST_LOG: process.env.RUST_LOG, - }, + return spawn(this.runnerConfig.launcherPath, ['launch', this.runnerConfig.launcherRunner], { + env: { + PATH: process.env.PATH, + N8N_RUNNERS_GRANT_TOKEN: grantToken, + N8N_RUNNERS_N8N_URI: n8nUri, + NODE_FUNCTION_ALLOW_BUILTIN: process.env.NODE_FUNCTION_ALLOW_BUILTIN, + NODE_FUNCTION_ALLOW_EXTERNAL: process.env.NODE_FUNCTION_ALLOW_EXTERNAL, + // For debug logging if enabled + RUST_LOG: process.env.RUST_LOG, }, - ); + }); } @OnShutdown() @@ -96,7 +101,7 @@ export class TaskRunnerProcess { this.isShuttingDown = true; // TODO: Timeout & force kill - if (this.globalConfig.taskRunners.useLauncher) { + if (this.useLauncher) { await this.killLauncher(); } else { this.killNode(); @@ -118,9 +123,9 @@ export class TaskRunnerProcess { return; } - const killProcess = spawn(this.globalConfig.taskRunners.launcherPath, [ + const killProcess = spawn(this.runnerConfig.launcherPath, [ 'kill', - this.globalConfig.taskRunners.launcherRunner, + this.runnerConfig.launcherRunner, this.process.pid.toString(), ]); diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts index fc31c100a3..c9d52fc22a 100644 --- a/packages/cli/src/runners/task-runner-server.ts +++ b/packages/cli/src/runners/task-runner-server.ts @@ -88,7 +88,7 @@ export class TaskRunnerServer { this.server = createHttpServer(app); const { - taskRunners: { port, listen_address: address }, + taskRunners: { port, listenAddress: address }, } = this.globalConfig; this.server.on('error', (error: Error & { code: string }) => { diff --git a/packages/cli/test/integration/runners/task-runner-process.test.ts b/packages/cli/test/integration/runners/task-runner-process.test.ts index 4b35e270df..c893add440 100644 --- a/packages/cli/test/integration/runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/runners/task-runner-process.test.ts @@ -1,4 +1,4 @@ -import { GlobalConfig } from '@n8n/config'; +import { TaskRunnersConfig } from '@n8n/config'; import Container from 'typedi'; import { TaskRunnerService } from '@/runners/runner-ws-server'; @@ -9,9 +9,11 @@ import { retryUntil } from '@test-integration/retry-until'; describe('TaskRunnerProcess', () => { const authToken = 'token'; - const globalConfig = Container.get(GlobalConfig); - globalConfig.taskRunners.authToken = authToken; - globalConfig.taskRunners.port = 0; // Use any port + const runnerConfig = Container.get(TaskRunnersConfig); + runnerConfig.disabled = false; + runnerConfig.mode = 'internal_childprocess'; + runnerConfig.authToken = authToken; + runnerConfig.port = 0; // Use any port const taskRunnerServer = Container.get(TaskRunnerServer); const runnerProcess = Container.get(TaskRunnerProcess); @@ -26,7 +28,7 @@ describe('TaskRunnerProcess', () => { beforeAll(async () => { await taskRunnerServer.start(); // Set the port to the actually used port - globalConfig.taskRunners.port = taskRunnerServer.port; + runnerConfig.port = taskRunnerServer.port; }); afterAll(async () => { @@ -100,7 +102,7 @@ describe('TaskRunnerProcess', () => { }); it('should launch runner directly if not using a launcher', async () => { - globalConfig.taskRunners.useLauncher = false; + runnerConfig.mode = 'internal_childprocess'; await runnerProcess.start(); @@ -109,18 +111,18 @@ describe('TaskRunnerProcess', () => { }); it('should use a launcher if configured', async () => { - globalConfig.taskRunners.useLauncher = true; - globalConfig.taskRunners.launcherPath = 'node'; + runnerConfig.mode = 'internal_launcher'; + runnerConfig.launcherPath = 'node'; await runnerProcess.start(); expect(startLauncherSpy).toBeCalledTimes(1); expect(startNodeSpy).toBeCalledTimes(0); - globalConfig.taskRunners.useLauncher = false; + runnerConfig.mode = 'internal_childprocess'; }); it('should kill the process directly if not using a launcher', async () => { - globalConfig.taskRunners.useLauncher = false; + runnerConfig.mode = 'internal_childprocess'; await runnerProcess.start(); await runnerProcess.stop(); @@ -130,14 +132,14 @@ describe('TaskRunnerProcess', () => { }); it('should kill the process using a launcher if configured', async () => { - globalConfig.taskRunners.useLauncher = true; - globalConfig.taskRunners.launcherPath = 'node'; + runnerConfig.mode = 'internal_launcher'; + runnerConfig.launcherPath = 'node'; await runnerProcess.start(); await runnerProcess.stop(); expect(killLauncherSpy).toBeCalledTimes(1); expect(killNodeSpy).toBeCalledTimes(0); - globalConfig.taskRunners.useLauncher = false; + runnerConfig.mode = 'internal_childprocess'; }); });