feat: Make runner concurrency configurable (no-changelog) (#11448)
Some checks failed
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Has been cancelled

This commit is contained in:
Tomi Turtiainen 2024-10-29 21:08:50 +02:00 committed by GitHub
parent ea47b025fb
commit d7ba206b30
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 107 additions and 78 deletions

View file

@ -10,6 +10,7 @@
"N8N_RUNNERS_GRANT_TOKEN",
"N8N_RUNNERS_N8N_URI",
"N8N_RUNNERS_MAX_PAYLOAD",
"N8N_RUNNERS_MAX_CONCURRENCY",
"NODE_FUNCTION_ALLOW_BUILTIN",
"NODE_FUNCTION_ALLOW_EXTERNAL",
"NODE_OPTIONS"

View file

@ -46,4 +46,8 @@ export class TaskRunnersConfig {
/** The --max-old-space-size option to use for the runner (in MB). Default means node.js will determine it based on the available memory. */
@Env('N8N_RUNNERS_MAX_OLD_SPACE_SIZE')
maxOldSpaceSize: string = '';
/** How many concurrent tasks can a runner execute at a time */
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
}

View file

@ -232,6 +232,7 @@ describe('GlobalConfig', () => {
launcherPath: '',
launcherRunner: 'javascript',
maxOldSpaceSize: '',
maxConcurrency: 5,
},
sentry: {
backendDsn: '',

View file

@ -22,9 +22,11 @@
"dist/**/*"
],
"dependencies": {
"@n8n/config": "workspace:*",
"n8n-workflow": "workspace:*",
"n8n-core": "workspace:*",
"nanoid": "^3.3.6",
"typedi": "catalog:",
"ws": "^8.18.0"
},
"devDependencies": {

View file

@ -0,0 +1,16 @@
import { Config, Env } from '@n8n/config';
@Config
export class BaseRunnerConfig {
@Env('N8N_RUNNERS_N8N_URI')
n8nUri: string = '127.0.0.1:5679';
@Env('N8N_RUNNERS_GRANT_TOKEN')
grantToken: string = '';
@Env('N8N_RUNNERS_MAX_PAYLOAD')
maxPayloadSize: number = 1024 * 1024 * 1024;
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
}

View file

@ -0,0 +1,10 @@
import { Config, Env } from '@n8n/config';
@Config
export class JsRunnerConfig {
@Env('NODE_FUNCTION_ALLOW_BUILTIN')
allowedBuiltInModules: string = '';
@Env('NODE_FUNCTION_ALLOW_EXTERNAL')
allowedExternalModules: string = '';
}

View file

@ -0,0 +1,13 @@
import { Config, Nested } from '@n8n/config';
import { BaseRunnerConfig } from './base-runner-config';
import { JsRunnerConfig } from './js-runner-config';
@Config
export class MainConfig {
@Nested
baseRunnerConfig!: BaseRunnerConfig;
@Nested
jsRunnerConfig!: JsRunnerConfig;
}

View file

@ -4,7 +4,6 @@ import fs from 'node:fs';
import { builtinModules } from 'node:module';
import { ValidationError } from '@/js-task-runner/errors/validation-error';
import type { JsTaskRunnerOpts } from '@/js-task-runner/js-task-runner';
import {
JsTaskRunner,
type AllCodeTaskData,
@ -13,17 +12,27 @@ import {
import type { Task } from '@/task-runner';
import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data';
import type { JsRunnerConfig } from '../../config/js-runner-config';
import { MainConfig } from '../../config/main-config';
import { ExecutionError } from '../errors/execution-error';
jest.mock('ws');
const defaultConfig = new MainConfig();
describe('JsTaskRunner', () => {
const createRunnerWithOpts = (opts: Partial<JsTaskRunnerOpts> = {}) =>
const createRunnerWithOpts = (opts: Partial<JsRunnerConfig> = {}) =>
new JsTaskRunner({
wsUrl: 'ws://localhost',
grantToken: 'grantToken',
maxConcurrency: 1,
...opts,
baseRunnerConfig: {
...defaultConfig.baseRunnerConfig,
grantToken: 'grantToken',
maxConcurrency: 1,
n8nUri: 'localhost',
},
jsRunnerConfig: {
...defaultConfig.jsRunnerConfig,
...opts,
},
});
const defaultTaskRunner = createRunnerWithOpts();

View file

@ -30,6 +30,7 @@ import { makeSerializable } from './errors/serializable-error';
import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
import type { MainConfig } from '../config/main-config';
export interface JSExecSettings {
code: string;
@ -76,23 +77,6 @@ export interface AllCodeTaskData {
additionalData: PartialAdditionalData;
}
export interface JsTaskRunnerOpts {
wsUrl: string;
grantToken: string;
maxConcurrency: number;
name?: string;
/**
* List of built-in nodejs modules that are allowed to be required in the
* execution sandbox. Asterisk (*) can be used to allow all.
*/
allowedBuiltInModules?: string;
/**
* List of npm modules that are allowed to be required in the execution
* sandbox. Asterisk (*) can be used to allow all.
*/
allowedExternalModules?: string;
}
type CustomConsole = {
log: (...args: unknown[]) => void;
};
@ -100,22 +84,20 @@ type CustomConsole = {
export class JsTaskRunner extends TaskRunner {
private readonly requireResolver: RequireResolver;
constructor({
grantToken,
maxConcurrency,
wsUrl,
name = 'JS Task Runner',
allowedBuiltInModules,
allowedExternalModules,
}: JsTaskRunnerOpts) {
super('javascript', wsUrl, grantToken, maxConcurrency, name);
constructor(config: MainConfig, name = 'JS Task Runner') {
super({
taskType: 'javascript',
name,
...config.baseRunnerConfig,
});
const { jsRunnerConfig } = config;
const parseModuleAllowList = (moduleList: string) =>
moduleList === '*' ? null : new Set(moduleList.split(',').map((x) => x.trim()));
this.requireResolver = createRequireResolver({
allowedBuiltInModules: parseModuleAllowList(allowedBuiltInModules ?? ''),
allowedExternalModules: parseModuleAllowList(allowedExternalModules ?? ''),
allowedBuiltInModules: parseModuleAllowList(jsRunnerConfig.allowedBuiltInModules ?? ''),
allowedExternalModules: parseModuleAllowList(jsRunnerConfig.allowedExternalModules ?? ''),
});
}

View file

@ -1,27 +1,12 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { ensureError } from 'n8n-workflow';
import Container from 'typedi';
import { MainConfig } from './config/main-config';
import { JsTaskRunner } from './js-task-runner/js-task-runner';
let runner: JsTaskRunner | undefined;
let isShuttingDown = false;
type Config = {
n8nUri: string;
grantToken: string;
};
function readAndParseConfig(): Config {
const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN;
if (!grantToken) {
throw new ApplicationError('Missing N8N_RUNNERS_GRANT_TOKEN environment variable');
}
return {
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? '127.0.0.1:5679',
grantToken,
};
}
function createSignalHandler(signal: string) {
return async function onSignal() {
if (isShuttingDown) {
@ -46,16 +31,9 @@ function createSignalHandler(signal: string) {
}
void (async function start() {
const config = readAndParseConfig();
const config = Container.get(MainConfig);
const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
runner = new JsTaskRunner({
wsUrl,
grantToken: config.grantToken,
maxConcurrency: 5,
allowedBuiltInModules: process.env.NODE_FUNCTION_ALLOW_BUILTIN,
allowedExternalModules: process.env.NODE_FUNCTION_ALLOW_EXTERNAL,
});
runner = new JsTaskRunner(config);
process.on('SIGINT', createSignalHandler('SIGINT'));
process.on('SIGTERM', createSignalHandler('SIGTERM'));

View file

@ -1,8 +1,8 @@
import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { URL } from 'node:url';
import { type MessageEvent, WebSocket } from 'ws';
import type { BaseRunnerConfig } from './config/base-runner-config';
import { TaskRunnerNodeTypes } from './node-types';
import {
RPC_ALLOW_LIST,
@ -42,7 +42,10 @@ export interface RPCCallObject {
const VALID_TIME_MS = 1000;
const VALID_EXTRA_MS = 100;
const DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 * 1024;
export interface TaskRunnerOpts extends BaseRunnerConfig {
taskType: string;
name?: string;
}
export abstract class TaskRunner {
id: string = nanoid();
@ -63,22 +66,23 @@ export abstract class TaskRunner {
nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]);
constructor(
public taskType: string,
wsUrl: string,
grantToken: string,
private maxConcurrency: number,
public name?: string,
) {
const url = new URL(wsUrl);
url.searchParams.append('id', this.id);
this.ws = new WebSocket(url.toString(), {
taskType: string;
maxConcurrency: number;
name: string;
constructor(opts: TaskRunnerOpts) {
this.taskType = opts.taskType;
this.name = opts.name ?? 'Node.js Task Runner SDK';
this.maxConcurrency = opts.maxConcurrency;
const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`;
this.ws = new WebSocket(wsUrl, {
headers: {
authorization: `Bearer ${grantToken}`,
authorization: `Bearer ${opts.grantToken}`,
},
maxPayload: process.env.N8N_RUNNERS_MAX_PAYLOAD
? parseInt(process.env.N8N_RUNNERS_MAX_PAYLOAD)
: DEFAULT_MAX_PAYLOAD_SIZE,
maxPayload: opts.maxPayloadSize,
});
this.ws.addEventListener('message', this.receiveMessage);
this.ws.addEventListener('close', this.stopTaskOffers);
@ -145,7 +149,7 @@ export abstract class TaskRunner {
case 'broker:inforequest':
this.send({
type: 'runner:info',
name: this.name ?? 'Node.js Task Runner SDK',
name: this.name,
types: [this.taskType],
});
break;

View file

@ -2,6 +2,8 @@
"extends": ["../../../tsconfig.json", "../../../tsconfig.backend.json"],
"compilerOptions": {
"rootDir": ".",
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"baseUrl": "src",
"paths": {
"@/*": ["./*"]

View file

@ -179,6 +179,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
N8N_RUNNERS_GRANT_TOKEN: grantToken,
N8N_RUNNERS_N8N_URI: n8nUri,
N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(),
N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(),
...this.getPassthroughEnvVars(),
};

View file

@ -642,6 +642,9 @@ importers:
packages/@n8n/task-runner:
dependencies:
'@n8n/config':
specifier: workspace:*
version: link:../config
n8n-core:
specifier: workspace:*
version: link:../../core
@ -651,6 +654,9 @@ importers:
nanoid:
specifier: ^3.3.6
version: 3.3.7
typedi:
specifier: 'catalog:'
version: 0.10.0(patch_hash=sk6omkefrosihg7lmqbzh7vfxe)
ws:
specifier: '>=8.17.1'
version: 8.17.1