feat: Make task runners work with n8n from npm (no-changelog) (#11015)

This commit is contained in:
Tomi Turtiainen 2024-10-02 15:16:02 +03:00 committed by GitHub
parent 6105bfeb4b
commit 74fa259b37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 245 additions and 18 deletions

View file

@ -3,7 +3,7 @@
"private": true, "private": true,
"version": "0.1.0", "version": "0.1.0",
"description": "", "description": "",
"main": "dist/index.js", "main": "dist/start.js",
"scripts": { "scripts": {
"start": "node dist/start.js", "start": "node dist/start.js",
"dev": "pnpm build && pnpm start", "dev": "pnpm build && pnpm start",
@ -23,9 +23,9 @@
"package.json", "package.json",
"tsconfig.json" "tsconfig.json"
], ],
"main": "dist/index.js", "main": "dist/start.js",
"module": "src/index.ts", "module": "src/start.ts",
"types": "dist/index.d.ts", "types": "dist/start.d.ts",
"packageManager": "pnpm@9.6.0", "packageManager": "pnpm@9.6.0",
"devDependencies": { "devDependencies": {
"@n8n_io/eslint-config": "^0.0.2", "@n8n_io/eslint-config": "^0.0.2",

View file

@ -1,5 +1,4 @@
import * as a from 'node:assert/strict'; import * as a from 'node:assert/strict';
import { JsTaskRunner } from './code'; import { JsTaskRunner } from './code';
import { authenticate } from './authenticator'; import { authenticate } from './authenticator';
@ -7,28 +6,39 @@ let _runner: JsTaskRunner;
type Config = { type Config = {
n8nUri: string; n8nUri: string;
authToken: string; authToken?: string;
grantToken?: string;
}; };
function readAndParseConfig(): Config { function readAndParseConfig(): Config {
const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN; const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN;
a.ok(authToken, 'Missing task runner auth token. Use N8N_RUNNERS_AUTH_TOKEN to configure it'); const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN;
if (!authToken && !grantToken) {
throw new Error(
'Missing task runner authentication. Use either N8N_RUNNERS_AUTH_TOKEN or N8N_RUNNERS_GRANT_TOKEN to configure it',
);
}
return { return {
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? 'localhost:5678', n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? 'localhost:5678',
authToken, authToken,
grantToken,
}; };
} }
void (async function start() { void (async function start() {
const config = readAndParseConfig(); const config = readAndParseConfig();
const grantToken = await authenticate({ let grantToken = config.grantToken;
if (!grantToken) {
a.ok(config.authToken);
grantToken = await authenticate({
authToken: config.authToken, authToken: config.authToken,
n8nUri: config.n8nUri, n8nUri: config.n8nUri,
}); });
}
const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`; const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`;
_runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5); _runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
})(); })();

View file

@ -225,6 +225,9 @@ export class Start extends BaseCommand {
if (!this.globalConfig.taskRunners.disabled) { if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new SingleMainTaskManager()); Container.set(TaskManager, new SingleMainTaskManager());
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();
} }
} }

View file

@ -44,7 +44,7 @@ function getWsEndpoint(restEndpoint: string) {
@Service() @Service()
export class TaskRunnerService { export class TaskRunnerService {
runnerConnections: Record<TaskRunner['id'], WebSocket> = {}; runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
@ -52,7 +52,7 @@ export class TaskRunnerService {
) {} ) {}
sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) { sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) {
this.runnerConnections[id]?.send(JSON.stringify(message)); this.runnerConnections.get(id)?.send(JSON.stringify(message));
} }
add(id: TaskRunner['id'], connection: WebSocket) { add(id: TaskRunner['id'], connection: WebSocket) {
@ -75,7 +75,7 @@ export class TaskRunnerService {
this.removeConnection(id); this.removeConnection(id);
isConnected = true; isConnected = true;
this.runnerConnections[id] = connection; this.runnerConnections.set(id, connection);
this.taskBroker.registerRunner( this.taskBroker.registerRunner(
{ {
@ -117,10 +117,11 @@ export class TaskRunnerService {
} }
removeConnection(id: TaskRunner['id']) { removeConnection(id: TaskRunner['id']) {
if (id in this.runnerConnections) { const connection = this.runnerConnections.get(id);
if (connection) {
this.taskBroker.deregisterRunner(id); this.taskBroker.deregisterRunner(id);
this.runnerConnections[id].close(); connection.close();
delete this.runnerConnections[id]; this.runnerConnections.delete(id);
} }
} }

View file

@ -0,0 +1,89 @@
import { GlobalConfig } from '@n8n/config';
import * as a from 'node:assert/strict';
import { spawn } from 'node:child_process';
import { Service } from 'typedi';
import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
import { OnShutdown } from '../decorators/on-shutdown';
type ChildProcess = ReturnType<typeof spawn>;
/**
* Manages the JS task runner process as a child process
*/
@Service()
export class TaskRunnerProcess {
public get isRunning() {
return this.process !== null;
}
/** The process ID of the task runner process */
public get pid() {
return this.process?.pid;
}
private process: ChildProcess | null = null;
/** Promise that resolves after the process has exited */
private runPromise: Promise<void> | null = null;
private isShuttingDown = false;
constructor(
private readonly globalConfig: GlobalConfig,
private readonly authService: TaskRunnerAuthService,
) {}
async start() {
a.ok(!this.process, 'Task Runner Process already running');
const grantToken = await this.authService.createGrantToken();
const startScript = require.resolve('@n8n/task-runner');
this.process = spawn('node', [startScript], {
env: {
PATH: process.env.PATH,
N8N_RUNNERS_GRANT_TOKEN: grantToken,
N8N_RUNNERS_N8N_URI: `localhost:${this.globalConfig.port}`,
},
});
this.process.stdout?.pipe(process.stdout);
this.process.stderr?.pipe(process.stderr);
this.monitorProcess(this.process);
}
@OnShutdown()
async stop() {
if (!this.process) {
return;
}
this.isShuttingDown = true;
// TODO: Timeout & force kill
this.process.kill();
await this.runPromise;
this.isShuttingDown = false;
}
private monitorProcess(process: ChildProcess) {
this.runPromise = new Promise((resolve) => {
process.on('exit', (code) => {
this.onProcessExit(code, resolve);
});
});
}
private onProcessExit(_code: number | null, resolveFn: () => void) {
this.process = null;
resolveFn();
// If we are not shutting down, restart the process
if (!this.isShuttingDown) {
setImmediate(async () => await this.start());
}
}
}

View file

@ -0,0 +1,81 @@
import { GlobalConfig } from '@n8n/config';
import Container from 'typedi';
import { TaskRunnerService } from '@/runners/runner-ws-server';
import { TaskBroker } from '@/runners/task-broker.service';
import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { retryUntil } from '@test-integration/retry-until';
import { setupTaskRunnerTestServer } from '@test-integration/utils/task-runner-test-server';
describe('TaskRunnerProcess', () => {
const authToken = 'token';
const globalConfig = Container.get(GlobalConfig);
globalConfig.taskRunners.authToken = authToken;
const testServer = setupTaskRunnerTestServer({});
globalConfig.port = testServer.port;
const runnerProcess = Container.get(TaskRunnerProcess);
const taskBroker = Container.get(TaskBroker);
const taskRunnerService = Container.get(TaskRunnerService);
afterEach(async () => {
await runnerProcess.stop();
});
const getNumConnectedRunners = () => taskRunnerService.runnerConnections.size;
const getNumRegisteredRunners = () => taskBroker.getKnownRunners().size;
it('should start and connect the task runner', async () => {
// Act
await runnerProcess.start();
// Assert
expect(runnerProcess.isRunning).toBeTruthy();
// Wait until the runner has connected
await retryUntil(() => expect(getNumConnectedRunners()).toBe(1));
expect(getNumRegisteredRunners()).toBe(1);
});
it('should stop an disconnect the task runner', async () => {
// Arrange
await runnerProcess.start();
// Wait until the runner has connected
await retryUntil(() => expect(getNumConnectedRunners()).toBe(1));
expect(getNumRegisteredRunners()).toBe(1);
// Act
await runnerProcess.stop();
// Assert
// Wait until the runner has disconnected
await retryUntil(() => expect(getNumConnectedRunners()).toBe(0));
expect(runnerProcess.isRunning).toBeFalsy();
expect(getNumRegisteredRunners()).toBe(0);
});
it('should restart the task runner if it exits', async () => {
// Arrange
await runnerProcess.start();
// Wait until the runner has connected
await retryUntil(() => expect(getNumConnectedRunners()).toBe(1));
const processId = runnerProcess.pid;
// Act
// @ts-expect-error private property
runnerProcess.process?.kill('SIGKILL');
// Assert
// Wait until the runner is running again
await retryUntil(() => expect(runnerProcess.isRunning).toBeTruthy());
expect(runnerProcess.pid).not.toBe(processId);
// Wait until the runner has connected again
await retryUntil(() => expect(getNumConnectedRunners()).toBe(1));
expect(getNumConnectedRunners()).toBe(1);
expect(getNumRegisteredRunners()).toBe(1);
});
});

View file

@ -0,0 +1,43 @@
import { GlobalConfig } from '@n8n/config';
import cookieParser from 'cookie-parser';
import type { Application } from 'express';
import express from 'express';
import type { Server } from 'node:http';
import type { AddressInfo } from 'node:net';
import Container from 'typedi';
import { rawBodyReader } from '@/middlewares';
import { setupRunnerHandler, setupRunnerServer } from '@/runners/runner-ws-server';
export interface TaskRunnerTestServer {
app: Application;
httpServer: Server;
port: number;
}
/**
* Sets up a task runner HTTP & WS server for testing purposes
*/
export const setupTaskRunnerTestServer = ({}): TaskRunnerTestServer => {
const app = express();
app.use(rawBodyReader);
app.use(cookieParser());
const testServer: TaskRunnerTestServer = {
app,
httpServer: app.listen(0),
port: 0,
};
testServer.port = (testServer.httpServer.address() as AddressInfo).port;
const globalConfig = Container.get(GlobalConfig);
setupRunnerServer(globalConfig.endpoints.rest, testServer.httpServer, testServer.app);
setupRunnerHandler(globalConfig.endpoints.rest, testServer.app);
afterAll(async () => {
testServer.httpServer.close();
});
return testServer;
};