diff --git a/docker/images/n8n/n8n-task-runners.json b/docker/images/n8n/n8n-task-runners.json index 4019189589..699794d504 100644 --- a/docker/images/n8n/n8n-task-runners.json +++ b/docker/images/n8n/n8n-task-runners.json @@ -10,6 +10,7 @@ "N8N_RUNNERS_GRANT_TOKEN", "N8N_RUNNERS_N8N_URI", "N8N_RUNNERS_MAX_PAYLOAD", + "N8N_RUNNERS_MAX_CONCURRENCY", "NODE_FUNCTION_ALLOW_BUILTIN", "NODE_FUNCTION_ALLOW_EXTERNAL", "NODE_OPTIONS" diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index f26b96636c..c7be197963 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -46,4 +46,8 @@ export class TaskRunnersConfig { /** The --max-old-space-size option to use for the runner (in MB). Default means node.js will determine it based on the available memory. */ @Env('N8N_RUNNERS_MAX_OLD_SPACE_SIZE') maxOldSpaceSize: string = ''; + + /** How many concurrent tasks can a runner execute at a time */ + @Env('N8N_RUNNERS_MAX_CONCURRENCY') + maxConcurrency: number = 5; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index cd5438e248..07af2c0a0b 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -232,6 +232,7 @@ describe('GlobalConfig', () => { launcherPath: '', launcherRunner: 'javascript', maxOldSpaceSize: '', + maxConcurrency: 5, }, sentry: { backendDsn: '', diff --git a/packages/@n8n/task-runner/package.json b/packages/@n8n/task-runner/package.json index d889bde6f3..fca6b9a22c 100644 --- a/packages/@n8n/task-runner/package.json +++ b/packages/@n8n/task-runner/package.json @@ -22,9 +22,11 @@ "dist/**/*" ], "dependencies": { + "@n8n/config": "workspace:*", "n8n-workflow": "workspace:*", "n8n-core": "workspace:*", "nanoid": "^3.3.6", + "typedi": "catalog:", "ws": "^8.18.0" }, "devDependencies": { diff --git a/packages/@n8n/task-runner/src/config/base-runner-config.ts b/packages/@n8n/task-runner/src/config/base-runner-config.ts new file mode 100644 index 0000000000..01e00c177a --- /dev/null +++ b/packages/@n8n/task-runner/src/config/base-runner-config.ts @@ -0,0 +1,16 @@ +import { Config, Env } from '@n8n/config'; + +@Config +export class BaseRunnerConfig { + @Env('N8N_RUNNERS_N8N_URI') + n8nUri: string = '127.0.0.1:5679'; + + @Env('N8N_RUNNERS_GRANT_TOKEN') + grantToken: string = ''; + + @Env('N8N_RUNNERS_MAX_PAYLOAD') + maxPayloadSize: number = 1024 * 1024 * 1024; + + @Env('N8N_RUNNERS_MAX_CONCURRENCY') + maxConcurrency: number = 5; +} diff --git a/packages/@n8n/task-runner/src/config/js-runner-config.ts b/packages/@n8n/task-runner/src/config/js-runner-config.ts new file mode 100644 index 0000000000..4cba6f1d98 --- /dev/null +++ b/packages/@n8n/task-runner/src/config/js-runner-config.ts @@ -0,0 +1,10 @@ +import { Config, Env } from '@n8n/config'; + +@Config +export class JsRunnerConfig { + @Env('NODE_FUNCTION_ALLOW_BUILTIN') + allowedBuiltInModules: string = ''; + + @Env('NODE_FUNCTION_ALLOW_EXTERNAL') + allowedExternalModules: string = ''; +} diff --git a/packages/@n8n/task-runner/src/config/main-config.ts b/packages/@n8n/task-runner/src/config/main-config.ts new file mode 100644 index 0000000000..a290c0c380 --- /dev/null +++ b/packages/@n8n/task-runner/src/config/main-config.ts @@ -0,0 +1,13 @@ +import { Config, Nested } from '@n8n/config'; + +import { BaseRunnerConfig } from './base-runner-config'; +import { JsRunnerConfig } from './js-runner-config'; + +@Config +export class MainConfig { + @Nested + baseRunnerConfig!: BaseRunnerConfig; + + @Nested + jsRunnerConfig!: JsRunnerConfig; +} diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index 5f83b23ae1..36f3c5afa2 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -4,7 +4,6 @@ import fs from 'node:fs'; import { builtinModules } from 'node:module'; import { ValidationError } from '@/js-task-runner/errors/validation-error'; -import type { JsTaskRunnerOpts } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner, type AllCodeTaskData, @@ -13,17 +12,27 @@ import { import type { Task } from '@/task-runner'; import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data'; +import type { JsRunnerConfig } from '../../config/js-runner-config'; +import { MainConfig } from '../../config/main-config'; import { ExecutionError } from '../errors/execution-error'; jest.mock('ws'); +const defaultConfig = new MainConfig(); + describe('JsTaskRunner', () => { - const createRunnerWithOpts = (opts: Partial = {}) => + const createRunnerWithOpts = (opts: Partial = {}) => new JsTaskRunner({ - wsUrl: 'ws://localhost', - grantToken: 'grantToken', - maxConcurrency: 1, - ...opts, + baseRunnerConfig: { + ...defaultConfig.baseRunnerConfig, + grantToken: 'grantToken', + maxConcurrency: 1, + n8nUri: 'localhost', + }, + jsRunnerConfig: { + ...defaultConfig.jsRunnerConfig, + ...opts, + }, }); const defaultTaskRunner = createRunnerWithOpts(); diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index d7ec7d85f4..40ee12af2c 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -30,6 +30,7 @@ import { makeSerializable } from './errors/serializable-error'; import type { RequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver'; import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; +import type { MainConfig } from '../config/main-config'; export interface JSExecSettings { code: string; @@ -76,23 +77,6 @@ export interface AllCodeTaskData { additionalData: PartialAdditionalData; } -export interface JsTaskRunnerOpts { - wsUrl: string; - grantToken: string; - maxConcurrency: number; - name?: string; - /** - * List of built-in nodejs modules that are allowed to be required in the - * execution sandbox. Asterisk (*) can be used to allow all. - */ - allowedBuiltInModules?: string; - /** - * List of npm modules that are allowed to be required in the execution - * sandbox. Asterisk (*) can be used to allow all. - */ - allowedExternalModules?: string; -} - type CustomConsole = { log: (...args: unknown[]) => void; }; @@ -100,22 +84,20 @@ type CustomConsole = { export class JsTaskRunner extends TaskRunner { private readonly requireResolver: RequireResolver; - constructor({ - grantToken, - maxConcurrency, - wsUrl, - name = 'JS Task Runner', - allowedBuiltInModules, - allowedExternalModules, - }: JsTaskRunnerOpts) { - super('javascript', wsUrl, grantToken, maxConcurrency, name); + constructor(config: MainConfig, name = 'JS Task Runner') { + super({ + taskType: 'javascript', + name, + ...config.baseRunnerConfig, + }); + const { jsRunnerConfig } = config; const parseModuleAllowList = (moduleList: string) => moduleList === '*' ? null : new Set(moduleList.split(',').map((x) => x.trim())); this.requireResolver = createRequireResolver({ - allowedBuiltInModules: parseModuleAllowList(allowedBuiltInModules ?? ''), - allowedExternalModules: parseModuleAllowList(allowedExternalModules ?? ''), + allowedBuiltInModules: parseModuleAllowList(jsRunnerConfig.allowedBuiltInModules ?? ''), + allowedExternalModules: parseModuleAllowList(jsRunnerConfig.allowedExternalModules ?? ''), }); } diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index f5487ba6c2..fcaab84d51 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -1,27 +1,12 @@ -import { ApplicationError, ensureError } from 'n8n-workflow'; +import { ensureError } from 'n8n-workflow'; +import Container from 'typedi'; +import { MainConfig } from './config/main-config'; import { JsTaskRunner } from './js-task-runner/js-task-runner'; let runner: JsTaskRunner | undefined; let isShuttingDown = false; -type Config = { - n8nUri: string; - grantToken: string; -}; - -function readAndParseConfig(): Config { - const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN; - if (!grantToken) { - throw new ApplicationError('Missing N8N_RUNNERS_GRANT_TOKEN environment variable'); - } - - return { - n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? '127.0.0.1:5679', - grantToken, - }; -} - function createSignalHandler(signal: string) { return async function onSignal() { if (isShuttingDown) { @@ -46,16 +31,9 @@ function createSignalHandler(signal: string) { } void (async function start() { - const config = readAndParseConfig(); + const config = Container.get(MainConfig); - const wsUrl = `ws://${config.n8nUri}/runners/_ws`; - runner = new JsTaskRunner({ - wsUrl, - grantToken: config.grantToken, - maxConcurrency: 5, - allowedBuiltInModules: process.env.NODE_FUNCTION_ALLOW_BUILTIN, - allowedExternalModules: process.env.NODE_FUNCTION_ALLOW_EXTERNAL, - }); + runner = new JsTaskRunner(config); process.on('SIGINT', createSignalHandler('SIGINT')); process.on('SIGTERM', createSignalHandler('SIGTERM')); diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index 356afb69e5..9629cc15d5 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -1,8 +1,8 @@ import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow'; import { nanoid } from 'nanoid'; -import { URL } from 'node:url'; import { type MessageEvent, WebSocket } from 'ws'; +import type { BaseRunnerConfig } from './config/base-runner-config'; import { TaskRunnerNodeTypes } from './node-types'; import { RPC_ALLOW_LIST, @@ -42,7 +42,10 @@ export interface RPCCallObject { const VALID_TIME_MS = 1000; const VALID_EXTRA_MS = 100; -const DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 * 1024; +export interface TaskRunnerOpts extends BaseRunnerConfig { + taskType: string; + name?: string; +} export abstract class TaskRunner { id: string = nanoid(); @@ -63,22 +66,23 @@ export abstract class TaskRunner { nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]); - constructor( - public taskType: string, - wsUrl: string, - grantToken: string, - private maxConcurrency: number, - public name?: string, - ) { - const url = new URL(wsUrl); - url.searchParams.append('id', this.id); - this.ws = new WebSocket(url.toString(), { + taskType: string; + + maxConcurrency: number; + + name: string; + + constructor(opts: TaskRunnerOpts) { + this.taskType = opts.taskType; + this.name = opts.name ?? 'Node.js Task Runner SDK'; + this.maxConcurrency = opts.maxConcurrency; + + const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`; + this.ws = new WebSocket(wsUrl, { headers: { - authorization: `Bearer ${grantToken}`, + authorization: `Bearer ${opts.grantToken}`, }, - maxPayload: process.env.N8N_RUNNERS_MAX_PAYLOAD - ? parseInt(process.env.N8N_RUNNERS_MAX_PAYLOAD) - : DEFAULT_MAX_PAYLOAD_SIZE, + maxPayload: opts.maxPayloadSize, }); this.ws.addEventListener('message', this.receiveMessage); this.ws.addEventListener('close', this.stopTaskOffers); @@ -145,7 +149,7 @@ export abstract class TaskRunner { case 'broker:inforequest': this.send({ type: 'runner:info', - name: this.name ?? 'Node.js Task Runner SDK', + name: this.name, types: [this.taskType], }); break; diff --git a/packages/@n8n/task-runner/tsconfig.json b/packages/@n8n/task-runner/tsconfig.json index db6ad545e3..ddee64ec1f 100644 --- a/packages/@n8n/task-runner/tsconfig.json +++ b/packages/@n8n/task-runner/tsconfig.json @@ -2,6 +2,8 @@ "extends": ["../../../tsconfig.json", "../../../tsconfig.backend.json"], "compilerOptions": { "rootDir": ".", + "emitDecoratorMetadata": true, + "experimentalDecorators": true, "baseUrl": "src", "paths": { "@/*": ["./*"] diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index d453f1d134..5b31a96ba3 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -179,6 +179,7 @@ export class TaskRunnerProcess extends TypedEmitter { N8N_RUNNERS_GRANT_TOKEN: grantToken, N8N_RUNNERS_N8N_URI: n8nUri, N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(), + N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(), ...this.getPassthroughEnvVars(), }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fe5ba1e4f7..f527da5bb3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -642,6 +642,9 @@ importers: packages/@n8n/task-runner: dependencies: + '@n8n/config': + specifier: workspace:* + version: link:../config n8n-core: specifier: workspace:* version: link:../../core @@ -651,6 +654,9 @@ importers: nanoid: specifier: ^3.3.6 version: 3.3.7 + typedi: + specifier: 'catalog:' + version: 0.10.0(patch_hash=sk6omkefrosihg7lmqbzh7vfxe) ws: specifier: '>=8.17.1' version: 8.17.1