feat: Enable running task runners externally (no-changelog) (#11319)

This commit is contained in:
Tomi Turtiainen 2024-10-22 16:23:59 +03:00 committed by GitHub
parent 216b119350
commit 8404282046
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 99 additions and 123 deletions

View file

@ -34,7 +34,7 @@ COPY docker/images/n8n/docker-entrypoint.sh /
# Setup the Task Runner Launcher # Setup the Task Runner Launcher
ARG TARGETPLATFORM ARG TARGETPLATFORM
ARG LAUNCHER_VERSION=0.1.1 ARG LAUNCHER_VERSION=0.1.1
ENV N8N_RUNNERS_USE_LAUNCHER=true \ ENV N8N_RUNNERS_MODE=internal_launcher \
N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher
COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json
# First, download, verify, then extract the launcher binary # First, download, verify, then extract the launcher binary

View file

@ -25,7 +25,7 @@ RUN set -eux; \
# Setup the Task Runner Launcher # Setup the Task Runner Launcher
ARG TARGETPLATFORM ARG TARGETPLATFORM
ARG LAUNCHER_VERSION=0.1.1 ARG LAUNCHER_VERSION=0.1.1
ENV N8N_RUNNERS_USE_LAUNCHER=true \ ENV N8N_RUNNERS_MODE=internal_launcher \
N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher
COPY n8n-task-runners.json /etc/n8n-task-runners.json COPY n8n-task-runners.json /etc/n8n-task-runners.json
# First, download, verify, then extract the launcher binary # First, download, verify, then extract the launcher binary

View file

@ -1,11 +1,23 @@
import { Config, Env } from '../decorators'; import { Config, Env } from '../decorators';
/**
* Whether to enable task runners and how to run them
* - internal_childprocess: Task runners are run as a child process and launched by n8n
* - internal_launcher: Task runners are run as a child process and launched by n8n using a separate launch program
* - external: Task runners are run as a separate program not launched by n8n
*/
export type TaskRunnerMode = 'internal_childprocess' | 'internal_launcher' | 'external';
@Config @Config
export class TaskRunnersConfig { export class TaskRunnersConfig {
// Defaults to true for now // Defaults to true for now
@Env('N8N_RUNNERS_DISABLED') @Env('N8N_RUNNERS_DISABLED')
disabled: boolean = true; disabled: boolean = true;
// Defaults to true for now
@Env('N8N_RUNNERS_MODE')
mode: TaskRunnerMode = 'internal_childprocess';
@Env('N8N_RUNNERS_PATH') @Env('N8N_RUNNERS_PATH')
path: string = '/runners'; path: string = '/runners';
@ -18,10 +30,7 @@ export class TaskRunnersConfig {
/** IP address task runners server should listen on */ /** IP address task runners server should listen on */
@Env('N8N_RUNNERS_SERVER_LISTEN_ADDRESS') @Env('N8N_RUNNERS_SERVER_LISTEN_ADDRESS')
listen_address: string = '127.0.0.1'; listenAddress: string = '127.0.0.1';
@Env('N8N_RUNNERS_USE_LAUNCHER')
useLauncher: boolean = false;
@Env('N8N_RUNNERS_LAUNCHER_PATH') @Env('N8N_RUNNERS_LAUNCHER_PATH')
launcherPath: string = ''; launcherPath: string = '';

View file

@ -223,11 +223,11 @@ describe('GlobalConfig', () => {
}, },
taskRunners: { taskRunners: {
disabled: true, disabled: true,
mode: 'internal_childprocess',
path: '/runners', path: '/runners',
authToken: '', authToken: '',
listen_address: '127.0.0.1', listenAddress: '127.0.0.1',
port: 5679, port: 5679,
useLauncher: false,
launcherPath: '', launcherPath: '',
launcherRunner: 'javascript', launcherRunner: 'javascript',
}, },

View file

@ -1,47 +0,0 @@
import { ApplicationError } from 'n8n-workflow';
import * as a from 'node:assert/strict';
export type AuthOpts = {
n8nUri: string;
authToken: string;
};
/**
* Requests a one-time token that can be used to establish a task runner connection
*/
export async function authenticate(opts: AuthOpts) {
try {
const authEndpoint = `http://${opts.n8nUri}/runners/auth`;
const response = await fetch(authEndpoint, {
method: 'POST',
headers: {
// eslint-disable-next-line @typescript-eslint/naming-convention
'Content-Type': 'application/json',
},
body: JSON.stringify({
token: opts.authToken,
}),
});
if (!response.ok) {
throw new ApplicationError(
`Invalid response status ${response.status}: ${await response.text()}`,
);
}
const { data } = (await response.json()) as { data: { token: string } };
const grantToken = data.token;
a.ok(grantToken);
return grantToken;
} catch (e) {
console.error(e);
const error = e as Error;
throw new ApplicationError(
`Could not connect to n8n message broker ${opts.n8nUri}: ${error.message}`,
{
cause: error,
},
);
}
}

View file

@ -1,7 +1,5 @@
import { ApplicationError, ensureError } from 'n8n-workflow'; import { ApplicationError, ensureError } from 'n8n-workflow';
import * as a from 'node:assert/strict';
import { authenticate } from './authenticator';
import { JsTaskRunner } from './js-task-runner/js-task-runner'; import { JsTaskRunner } from './js-task-runner/js-task-runner';
let runner: JsTaskRunner | undefined; let runner: JsTaskRunner | undefined;
@ -9,22 +7,17 @@ let isShuttingDown = false;
type Config = { type Config = {
n8nUri: string; n8nUri: string;
authToken?: string; grantToken: string;
grantToken?: string;
}; };
function readAndParseConfig(): Config { function readAndParseConfig(): Config {
const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN;
const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN; const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN;
if (!authToken && !grantToken) { if (!grantToken) {
throw new ApplicationError( throw new ApplicationError('Missing N8N_RUNNERS_GRANT_TOKEN environment variable');
'Missing task runner authentication. Use either N8N_RUNNERS_AUTH_TOKEN or N8N_RUNNERS_GRANT_TOKEN to configure it',
);
} }
return { return {
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? '127.0.0.1:5679', n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? '127.0.0.1:5679',
authToken,
grantToken, grantToken,
}; };
} }
@ -55,20 +48,10 @@ function createSignalHandler(signal: string) {
void (async function start() { void (async function start() {
const config = readAndParseConfig(); const config = readAndParseConfig();
let grantToken = config.grantToken;
if (!grantToken) {
a.ok(config.authToken);
grantToken = await authenticate({
authToken: config.authToken,
n8nUri: config.n8nUri,
});
}
const wsUrl = `ws://${config.n8nUri}/runners/_ws`; const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
runner = new JsTaskRunner({ runner = new JsTaskRunner({
wsUrl, wsUrl,
grantToken, grantToken: config.grantToken,
maxConcurrency: 5, maxConcurrency: 5,
allowedBuiltInModules: process.env.NODE_FUNCTION_ALLOW_BUILTIN, allowedBuiltInModules: process.env.NODE_FUNCTION_ALLOW_BUILTIN,
allowedExternalModules: process.env.NODE_FUNCTION_ALLOW_EXTERNAL, allowedExternalModules: process.env.NODE_FUNCTION_ALLOW_EXTERNAL,

View file

@ -222,17 +222,23 @@ export class Start extends BaseCommand {
await this.generateStaticAssets(); await this.generateStaticAssets();
} }
if (!this.globalConfig.taskRunners.disabled) { const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (!taskRunnerConfig.disabled) {
Container.set(TaskManager, new LocalTaskManager()); Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server'); const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer); const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start(); await taskRunnerServer.start();
if (
taskRunnerConfig.mode === 'internal_childprocess' ||
taskRunnerConfig.mode === 'internal_launcher'
) {
const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess); const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start(); await runnerProcess.start();
} }
} }
}
async initOrchestration() { async initOrchestration() {
if (config.getEnv('executions.mode') === 'regular') { if (config.getEnv('executions.mode') === 'regular') {

View file

@ -114,17 +114,23 @@ export class Worker extends BaseCommand {
}), }),
); );
if (!this.globalConfig.taskRunners.disabled) { const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (!taskRunnerConfig.disabled) {
Container.set(TaskManager, new LocalTaskManager()); Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server'); const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer); const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start(); await taskRunnerServer.start();
if (
taskRunnerConfig.mode === 'internal_childprocess' ||
taskRunnerConfig.mode === 'internal_launcher'
) {
const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess); const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start(); await runnerProcess.start();
} }
} }
}
async initEventBus() { async initEventBus() {
await Container.get(MessageEventBus).initialize({ await Container.get(MessageEventBus).initialize({

View file

@ -1,4 +1,4 @@
import { GlobalConfig } from '@n8n/config'; import { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { ChildProcess, SpawnOptions } from 'node:child_process'; import type { ChildProcess, SpawnOptions } from 'node:child_process';
@ -19,14 +19,26 @@ const spawnMock = jest.fn(() =>
require('child_process').spawn = spawnMock; require('child_process').spawn = spawnMock;
describe('TaskRunnerProcess', () => { describe('TaskRunnerProcess', () => {
const globalConfig = mockInstance(GlobalConfig); const runnerConfig = mockInstance(TaskRunnersConfig);
runnerConfig.disabled = false;
runnerConfig.mode = 'internal_childprocess';
const authService = mock<TaskRunnerAuthService>(); const authService = mock<TaskRunnerAuthService>();
const taskRunnerProcess = new TaskRunnerProcess(globalConfig, authService); const taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService);
afterEach(async () => { afterEach(async () => {
spawnMock.mockClear(); spawnMock.mockClear();
}); });
describe('constructor', () => {
it('should throw if runner mode is external', () => {
runnerConfig.mode = 'external';
expect(() => new TaskRunnerProcess(runnerConfig, authService)).toThrow();
runnerConfig.mode = 'internal_childprocess';
});
});
describe('start', () => { describe('start', () => {
it('should propagate NODE_FUNCTION_ALLOW_BUILTIN and NODE_FUNCTION_ALLOW_EXTERNAL from env', async () => { it('should propagate NODE_FUNCTION_ALLOW_BUILTIN and NODE_FUNCTION_ALLOW_EXTERNAL from env', async () => {
jest.spyOn(authService, 'createGrantToken').mockResolvedValue('grantToken'); jest.spyOn(authService, 'createGrantToken').mockResolvedValue('grantToken');

View file

@ -1,4 +1,4 @@
import { GlobalConfig } from '@n8n/config'; import { TaskRunnersConfig } from '@n8n/config';
import * as a from 'node:assert/strict'; import * as a from 'node:assert/strict';
import { spawn } from 'node:child_process'; import { spawn } from 'node:child_process';
import * as process from 'node:process'; import * as process from 'node:process';
@ -28,6 +28,10 @@ export class TaskRunnerProcess {
return this._runPromise; return this._runPromise;
} }
private get useLauncher() {
return this.runnerConfig.mode === 'internal_launcher';
}
private process: ChildProcess | null = null; private process: ChildProcess | null = null;
private _runPromise: Promise<void> | null = null; private _runPromise: Promise<void> | null = null;
@ -35,17 +39,22 @@ export class TaskRunnerProcess {
private isShuttingDown = false; private isShuttingDown = false;
constructor( constructor(
private readonly globalConfig: GlobalConfig, private readonly runnerConfig: TaskRunnersConfig,
private readonly authService: TaskRunnerAuthService, private readonly authService: TaskRunnerAuthService,
) {} ) {
a.ok(
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher',
);
}
async start() { async start() {
a.ok(!this.process, 'Task Runner Process already running'); a.ok(!this.process, 'Task Runner Process already running');
const grantToken = await this.authService.createGrantToken(); const grantToken = await this.authService.createGrantToken();
const n8nUri = `127.0.0.1:${this.globalConfig.taskRunners.port}`; const n8nUri = `127.0.0.1:${this.runnerConfig.port}`;
this.process = this.globalConfig.taskRunners.useLauncher this.process = this.useLauncher
? this.startLauncher(grantToken, n8nUri) ? this.startLauncher(grantToken, n8nUri)
: this.startNode(grantToken, n8nUri); : this.startNode(grantToken, n8nUri);
@ -70,10 +79,7 @@ export class TaskRunnerProcess {
} }
startLauncher(grantToken: string, n8nUri: string) { startLauncher(grantToken: string, n8nUri: string) {
return spawn( return spawn(this.runnerConfig.launcherPath, ['launch', this.runnerConfig.launcherRunner], {
this.globalConfig.taskRunners.launcherPath,
['launch', this.globalConfig.taskRunners.launcherRunner],
{
env: { env: {
PATH: process.env.PATH, PATH: process.env.PATH,
N8N_RUNNERS_GRANT_TOKEN: grantToken, N8N_RUNNERS_GRANT_TOKEN: grantToken,
@ -83,8 +89,7 @@ export class TaskRunnerProcess {
// For debug logging if enabled // For debug logging if enabled
RUST_LOG: process.env.RUST_LOG, RUST_LOG: process.env.RUST_LOG,
}, },
}, });
);
} }
@OnShutdown() @OnShutdown()
@ -96,7 +101,7 @@ export class TaskRunnerProcess {
this.isShuttingDown = true; this.isShuttingDown = true;
// TODO: Timeout & force kill // TODO: Timeout & force kill
if (this.globalConfig.taskRunners.useLauncher) { if (this.useLauncher) {
await this.killLauncher(); await this.killLauncher();
} else { } else {
this.killNode(); this.killNode();
@ -118,9 +123,9 @@ export class TaskRunnerProcess {
return; return;
} }
const killProcess = spawn(this.globalConfig.taskRunners.launcherPath, [ const killProcess = spawn(this.runnerConfig.launcherPath, [
'kill', 'kill',
this.globalConfig.taskRunners.launcherRunner, this.runnerConfig.launcherRunner,
this.process.pid.toString(), this.process.pid.toString(),
]); ]);

View file

@ -88,7 +88,7 @@ export class TaskRunnerServer {
this.server = createHttpServer(app); this.server = createHttpServer(app);
const { const {
taskRunners: { port, listen_address: address }, taskRunners: { port, listenAddress: address },
} = this.globalConfig; } = this.globalConfig;
this.server.on('error', (error: Error & { code: string }) => { this.server.on('error', (error: Error & { code: string }) => {

View file

@ -1,4 +1,4 @@
import { GlobalConfig } from '@n8n/config'; import { TaskRunnersConfig } from '@n8n/config';
import Container from 'typedi'; import Container from 'typedi';
import { TaskRunnerService } from '@/runners/runner-ws-server'; import { TaskRunnerService } from '@/runners/runner-ws-server';
@ -9,9 +9,11 @@ import { retryUntil } from '@test-integration/retry-until';
describe('TaskRunnerProcess', () => { describe('TaskRunnerProcess', () => {
const authToken = 'token'; const authToken = 'token';
const globalConfig = Container.get(GlobalConfig); const runnerConfig = Container.get(TaskRunnersConfig);
globalConfig.taskRunners.authToken = authToken; runnerConfig.disabled = false;
globalConfig.taskRunners.port = 0; // Use any port runnerConfig.mode = 'internal_childprocess';
runnerConfig.authToken = authToken;
runnerConfig.port = 0; // Use any port
const taskRunnerServer = Container.get(TaskRunnerServer); const taskRunnerServer = Container.get(TaskRunnerServer);
const runnerProcess = Container.get(TaskRunnerProcess); const runnerProcess = Container.get(TaskRunnerProcess);
@ -26,7 +28,7 @@ describe('TaskRunnerProcess', () => {
beforeAll(async () => { beforeAll(async () => {
await taskRunnerServer.start(); await taskRunnerServer.start();
// Set the port to the actually used port // Set the port to the actually used port
globalConfig.taskRunners.port = taskRunnerServer.port; runnerConfig.port = taskRunnerServer.port;
}); });
afterAll(async () => { afterAll(async () => {
@ -100,7 +102,7 @@ describe('TaskRunnerProcess', () => {
}); });
it('should launch runner directly if not using a launcher', async () => { it('should launch runner directly if not using a launcher', async () => {
globalConfig.taskRunners.useLauncher = false; runnerConfig.mode = 'internal_childprocess';
await runnerProcess.start(); await runnerProcess.start();
@ -109,18 +111,18 @@ describe('TaskRunnerProcess', () => {
}); });
it('should use a launcher if configured', async () => { it('should use a launcher if configured', async () => {
globalConfig.taskRunners.useLauncher = true; runnerConfig.mode = 'internal_launcher';
globalConfig.taskRunners.launcherPath = 'node'; runnerConfig.launcherPath = 'node';
await runnerProcess.start(); await runnerProcess.start();
expect(startLauncherSpy).toBeCalledTimes(1); expect(startLauncherSpy).toBeCalledTimes(1);
expect(startNodeSpy).toBeCalledTimes(0); expect(startNodeSpy).toBeCalledTimes(0);
globalConfig.taskRunners.useLauncher = false; runnerConfig.mode = 'internal_childprocess';
}); });
it('should kill the process directly if not using a launcher', async () => { it('should kill the process directly if not using a launcher', async () => {
globalConfig.taskRunners.useLauncher = false; runnerConfig.mode = 'internal_childprocess';
await runnerProcess.start(); await runnerProcess.start();
await runnerProcess.stop(); await runnerProcess.stop();
@ -130,14 +132,14 @@ describe('TaskRunnerProcess', () => {
}); });
it('should kill the process using a launcher if configured', async () => { it('should kill the process using a launcher if configured', async () => {
globalConfig.taskRunners.useLauncher = true; runnerConfig.mode = 'internal_launcher';
globalConfig.taskRunners.launcherPath = 'node'; runnerConfig.launcherPath = 'node';
await runnerProcess.start(); await runnerProcess.start();
await runnerProcess.stop(); await runnerProcess.stop();
expect(killLauncherSpy).toBeCalledTimes(1); expect(killLauncherSpy).toBeCalledTimes(1);
expect(killNodeSpy).toBeCalledTimes(0); expect(killNodeSpy).toBeCalledTimes(0);
globalConfig.taskRunners.useLauncher = false; runnerConfig.mode = 'internal_childprocess';
}); });
}); });