feat: Separate task runner server from main http server (no-changelog) (#11062)

This commit is contained in:
Tomi Turtiainen 2024-10-02 16:38:42 +03:00 committed by GitHub
parent 8d9eb162ae
commit 4546649c61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 239 additions and 65 deletions

View file

@ -11,4 +11,12 @@ export class TaskRunnersConfig {
@Env('N8N_RUNNERS_AUTH_TOKEN')
authToken: string = '';
/** IP address task runners server should listen on */
@Env('N8N_RUNNERS_SERVER_PORT')
port: number = 5679;
/** IP address task runners server should listen on */
@Env('N8N_RUNNERS_SERVER_LISTEN_ADDRESS')
listen_address: string = '127.0.0.1';
}

View file

@ -225,6 +225,8 @@ describe('GlobalConfig', () => {
disabled: true,
path: '/runners',
authToken: '',
listen_address: '127.0.0.1',
port: 5679,
},
sentry: {
backendDsn: '',

View file

@ -1,4 +1,6 @@
import * as a from 'node:assert/strict';
import { ensureError } from 'n8n-workflow';
import { JsTaskRunner } from './code';
import { authenticate } from './authenticator';
@ -39,6 +41,10 @@ void (async function start() {
});
}
const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`;
const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
_runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
})();
})().catch((e) => {
const error = ensureError(e);
console.error('Task runner failed to start', { error });
process.exit(1);
});

View file

@ -119,8 +119,6 @@ export abstract class AbstractServer {
protected setupPushServer() {}
protected setupRunnerServer() {}
private async setupHealthCheck() {
// main health check should not care about DB connections
this.app.get('/healthz', async (_req, res) => {
@ -184,10 +182,6 @@ export abstract class AbstractServer {
if (!inTest) {
await this.setupErrorHandlers();
this.setupPushServer();
if (!this.globalConfig.taskRunners.disabled) {
this.setupRunnerServer();
}
}
this.setupCommonMiddlewares();

View file

@ -225,6 +225,10 @@ export class Start extends BaseCommand {
if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new SingleMainTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();

View file

@ -168,6 +168,8 @@ export const ARTIFICIAL_TASK_DATA = {
],
};
/** Lowest priority, meaning shut down happens after other groups */
export const LOWEST_SHUTDOWN_PRIORITY = 0;
export const DEFAULT_SHUTDOWN_PRIORITY = 100;
/** Highest priority, meaning shut down happens before all other groups */
export const HIGHEST_SHUTDOWN_PRIORITY = 200;

View file

@ -44,7 +44,7 @@ export class TaskRunnerProcess {
env: {
PATH: process.env.PATH,
N8N_RUNNERS_GRANT_TOKEN: grantToken,
N8N_RUNNERS_N8N_URI: `localhost:${this.globalConfig.port}`,
N8N_RUNNERS_N8N_URI: `localhost:${this.globalConfig.taskRunners.port}`,
},
});

View file

@ -0,0 +1,201 @@
import { GlobalConfig } from '@n8n/config';
import compression from 'compression';
import express from 'express';
import * as a from 'node:assert/strict';
import { randomBytes } from 'node:crypto';
import { ServerResponse, type Server, createServer as createHttpServer } from 'node:http';
import type { AddressInfo, Socket } from 'node:net';
import { parse as parseUrl } from 'node:url';
import { Service } from 'typedi';
import { Server as WSServer } from 'ws';
import { inTest, LOWEST_SHUTDOWN_PRIORITY } from '@/constants';
import { OnShutdown } from '@/decorators/on-shutdown';
import { Logger } from '@/logging/logger.service';
import { bodyParser, rawBodyReader } from '@/middlewares';
import { send } from '@/response-helper';
import { TaskRunnerAuthController } from '@/runners/auth/task-runner-auth.controller';
import type {
TaskRunnerServerInitRequest,
TaskRunnerServerInitResponse,
} from '@/runners/runner-types';
import { TaskRunnerService } from '@/runners/runner-ws-server';
/**
* Task Runner HTTP & WS server
*/
@Service()
export class TaskRunnerServer {
private server: Server | undefined;
private wsServer: WSServer | undefined;
readonly app: express.Application;
public get port() {
return (this.server?.address() as AddressInfo)?.port;
}
private get upgradeEndpoint() {
return `${this.getEndpointBasePath()}/_ws`;
}
constructor(
private readonly logger: Logger,
private readonly globalConfig: GlobalConfig,
private readonly taskRunnerAuthController: TaskRunnerAuthController,
private readonly taskRunnerService: TaskRunnerService,
) {
this.app = express();
this.app.disable('x-powered-by');
if (!this.globalConfig.taskRunners.authToken) {
// Generate an auth token if one is not set
this.globalConfig.taskRunners.authToken = randomBytes(32).toString('hex');
}
}
async start(): Promise<void> {
await this.setupHttpServer();
this.setupWsServer();
if (!inTest) {
await this.setupErrorHandlers();
}
this.setupCommonMiddlewares();
this.configureRoutes();
}
@OnShutdown(LOWEST_SHUTDOWN_PRIORITY)
async stop(): Promise<void> {
if (this.wsServer) {
this.wsServer.close();
this.wsServer = undefined;
}
if (this.server) {
await new Promise<void>((resolve) => this.server?.close(() => resolve()));
this.server = undefined;
}
}
/** Creates an HTTP server and listens to the configured port */
private async setupHttpServer() {
const { app } = this;
this.server = createHttpServer(app);
const {
taskRunners: { port, listen_address: address },
} = this.globalConfig;
this.server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
this.logger.info(
`n8n Task Runner's port ${port} is already in use. Do you have another instance of n8n running already?`,
);
process.exit(1);
}
});
await new Promise<void>((resolve) => {
a.ok(this.server);
this.server.listen(port, address, () => resolve());
});
this.logger.info(`n8n Task Runner server ready on ${address}, port ${port}`);
}
/** Creates WebSocket server for handling upgrade requests */
private setupWsServer() {
const { authToken } = this.globalConfig.taskRunners;
a.ok(authToken);
a.ok(this.server);
this.wsServer = new WSServer({ noServer: true });
this.server.on('upgrade', this.handleUpgradeRequest);
}
private async setupErrorHandlers() {
const { app } = this;
// Augment errors sent to Sentry
const {
Handlers: { requestHandler, errorHandler },
} = await import('@sentry/node');
app.use(requestHandler());
app.use(errorHandler());
}
private setupCommonMiddlewares() {
// Compress the response data
this.app.use(compression());
this.app.use(rawBodyReader);
this.app.use(bodyParser);
}
private configureRoutes() {
this.app.use(
this.upgradeEndpoint,
// eslint-disable-next-line @typescript-eslint/unbound-method
this.taskRunnerAuthController.authMiddleware,
(req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) =>
this.taskRunnerService.handleRequest(req, res),
);
const authEndpoint = `${this.getEndpointBasePath()}/auth`;
this.app.post(
authEndpoint,
send(async (req) => await this.taskRunnerAuthController.createGrantToken(req)),
);
}
private handleUpgradeRequest = (
request: TaskRunnerServerInitRequest,
socket: Socket,
head: Buffer,
) => {
if (parseUrl(request.url).pathname !== this.upgradeEndpoint) {
socket.write('HTTP/1.1 404 Not Found\r\n\r\n');
socket.destroy();
return;
}
if (!this.wsServer) {
// This might happen if the server is shutting down and we receive an upgrade request
socket.write('HTTP/1.1 503 Service Unavailable\r\n\r\n');
socket.destroy();
return;
}
this.wsServer.handleUpgrade(request, socket, head, (ws) => {
request.ws = ws;
const response = new ServerResponse(request);
response.writeHead = (statusCode) => {
if (statusCode > 200) ws.close(100);
return response;
};
// @ts-expect-error Delegate the request to the express app. This function is not exposed
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
this.app.handle(request, response);
});
};
/** Returns the normalized base path for the task runner endpoints */
private getEndpointBasePath() {
let path = this.globalConfig.taskRunners.path;
if (!path.startsWith('/')) {
path = `/${path}`;
}
if (path.endsWith('/')) {
path = path.slice(-1);
}
return path;
}
}

View file

@ -31,7 +31,6 @@ import { isApiEnabled, loadPublicApiVersions } from '@/public-api';
import { setupPushServer, setupPushHandler, Push } from '@/push';
import type { APIRequest } from '@/requests';
import * as ResponseHelper from '@/response-helper';
import { setupRunnerServer, setupRunnerHandler } from '@/runners/runner-ws-server';
import type { FrontendService } from '@/services/frontend.service';
import { OrchestrationService } from '@/services/orchestration.service';
@ -202,10 +201,6 @@ export class Server extends AbstractServer {
const { restEndpoint, app } = this;
setupPushHandler(restEndpoint, app);
if (!this.globalConfig.taskRunners.disabled) {
setupRunnerHandler(restEndpoint, app);
}
const push = Container.get(Push);
if (push.isBidirectional) {
const { CollaborationService } = await import('@/collaboration/collaboration.service');
@ -405,9 +400,4 @@ export class Server extends AbstractServer {
const { restEndpoint, server, app } = this;
setupPushServer(restEndpoint, server, app);
}
protected setupRunnerServer(): void {
const { restEndpoint, server, app } = this;
setupRunnerServer(restEndpoint, server, app);
}
}

View file

@ -4,20 +4,30 @@ import Container from 'typedi';
import { TaskRunnerService } from '@/runners/runner-ws-server';
import { TaskBroker } from '@/runners/task-broker.service';
import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { TaskRunnerServer } from '@/runners/task-runner-server';
import { retryUntil } from '@test-integration/retry-until';
import { setupTaskRunnerTestServer } from '@test-integration/utils/task-runner-test-server';
describe('TaskRunnerProcess', () => {
const authToken = 'token';
const globalConfig = Container.get(GlobalConfig);
globalConfig.taskRunners.authToken = authToken;
const testServer = setupTaskRunnerTestServer({});
globalConfig.port = testServer.port;
globalConfig.taskRunners.port = 0; // Use any port
const taskRunnerServer = Container.get(TaskRunnerServer);
const runnerProcess = Container.get(TaskRunnerProcess);
const taskBroker = Container.get(TaskBroker);
const taskRunnerService = Container.get(TaskRunnerService);
beforeAll(async () => {
await taskRunnerServer.start();
// Set the port to the actually used port
globalConfig.taskRunners.port = taskRunnerServer.port;
});
afterAll(async () => {
await taskRunnerServer.stop();
});
afterEach(async () => {
await runnerProcess.stop();
});

View file

@ -1,43 +0,0 @@
import { GlobalConfig } from '@n8n/config';
import cookieParser from 'cookie-parser';
import type { Application } from 'express';
import express from 'express';
import type { Server } from 'node:http';
import type { AddressInfo } from 'node:net';
import Container from 'typedi';
import { rawBodyReader } from '@/middlewares';
import { setupRunnerHandler, setupRunnerServer } from '@/runners/runner-ws-server';
export interface TaskRunnerTestServer {
app: Application;
httpServer: Server;
port: number;
}
/**
* Sets up a task runner HTTP & WS server for testing purposes
*/
export const setupTaskRunnerTestServer = ({}): TaskRunnerTestServer => {
const app = express();
app.use(rawBodyReader);
app.use(cookieParser());
const testServer: TaskRunnerTestServer = {
app,
httpServer: app.listen(0),
port: 0,
};
testServer.port = (testServer.httpServer.address() as AddressInfo).port;
const globalConfig = Container.get(GlobalConfig);
setupRunnerServer(globalConfig.endpoints.rest, testServer.httpServer, testServer.app);
setupRunnerHandler(globalConfig.endpoints.rest, testServer.app);
afterAll(async () => {
testServer.httpServer.close();
});
return testServer;
};