diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 5eb452b2b8..e7335e8827 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -11,4 +11,12 @@ export class TaskRunnersConfig { @Env('N8N_RUNNERS_AUTH_TOKEN') authToken: string = ''; + + /** IP address task runners server should listen on */ + @Env('N8N_RUNNERS_SERVER_PORT') + port: number = 5679; + + /** IP address task runners server should listen on */ + @Env('N8N_RUNNERS_SERVER_LISTEN_ADDRESS') + listen_address: string = '127.0.0.1'; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index a93f29d5f9..5c5de4e44a 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -225,6 +225,8 @@ describe('GlobalConfig', () => { disabled: true, path: '/runners', authToken: '', + listen_address: '127.0.0.1', + port: 5679, }, sentry: { backendDsn: '', diff --git a/packages/@n8n/task-runner-node-js/src/start.ts b/packages/@n8n/task-runner-node-js/src/start.ts index b845000b9c..c4cfdd5149 100644 --- a/packages/@n8n/task-runner-node-js/src/start.ts +++ b/packages/@n8n/task-runner-node-js/src/start.ts @@ -1,4 +1,6 @@ import * as a from 'node:assert/strict'; +import { ensureError } from 'n8n-workflow'; + import { JsTaskRunner } from './code'; import { authenticate } from './authenticator'; @@ -39,6 +41,10 @@ void (async function start() { }); } - const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`; + const wsUrl = `ws://${config.n8nUri}/runners/_ws`; _runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5); -})(); +})().catch((e) => { + const error = ensureError(e); + console.error('Task runner failed to start', { error }); + process.exit(1); +}); diff --git a/packages/cli/src/abstract-server.ts b/packages/cli/src/abstract-server.ts index 7f9cac807e..95ecaccdc5 100644 --- a/packages/cli/src/abstract-server.ts +++ b/packages/cli/src/abstract-server.ts @@ -119,8 +119,6 @@ export abstract class AbstractServer { protected setupPushServer() {} - protected setupRunnerServer() {} - private async setupHealthCheck() { // main health check should not care about DB connections this.app.get('/healthz', async (_req, res) => { @@ -184,10 +182,6 @@ export abstract class AbstractServer { if (!inTest) { await this.setupErrorHandlers(); this.setupPushServer(); - - if (!this.globalConfig.taskRunners.disabled) { - this.setupRunnerServer(); - } } this.setupCommonMiddlewares(); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 8dcdfe5a0e..36e690c37e 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -225,6 +225,10 @@ export class Start extends BaseCommand { if (!this.globalConfig.taskRunners.disabled) { Container.set(TaskManager, new SingleMainTaskManager()); + const { TaskRunnerServer } = await import('@/runners/task-runner-server'); + const taskRunnerServer = Container.get(TaskRunnerServer); + await taskRunnerServer.start(); + const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); const runnerProcess = Container.get(TaskRunnerProcess); await runnerProcess.start(); diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 447b32e42f..5d458ca376 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -168,6 +168,8 @@ export const ARTIFICIAL_TASK_DATA = { ], }; +/** Lowest priority, meaning shut down happens after other groups */ export const LOWEST_SHUTDOWN_PRIORITY = 0; export const DEFAULT_SHUTDOWN_PRIORITY = 100; +/** Highest priority, meaning shut down happens before all other groups */ export const HIGHEST_SHUTDOWN_PRIORITY = 200; diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index 3bee225649..04512435ae 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -44,7 +44,7 @@ export class TaskRunnerProcess { env: { PATH: process.env.PATH, N8N_RUNNERS_GRANT_TOKEN: grantToken, - N8N_RUNNERS_N8N_URI: `localhost:${this.globalConfig.port}`, + N8N_RUNNERS_N8N_URI: `localhost:${this.globalConfig.taskRunners.port}`, }, }); diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts new file mode 100644 index 0000000000..fc31c100a3 --- /dev/null +++ b/packages/cli/src/runners/task-runner-server.ts @@ -0,0 +1,201 @@ +import { GlobalConfig } from '@n8n/config'; +import compression from 'compression'; +import express from 'express'; +import * as a from 'node:assert/strict'; +import { randomBytes } from 'node:crypto'; +import { ServerResponse, type Server, createServer as createHttpServer } from 'node:http'; +import type { AddressInfo, Socket } from 'node:net'; +import { parse as parseUrl } from 'node:url'; +import { Service } from 'typedi'; +import { Server as WSServer } from 'ws'; + +import { inTest, LOWEST_SHUTDOWN_PRIORITY } from '@/constants'; +import { OnShutdown } from '@/decorators/on-shutdown'; +import { Logger } from '@/logging/logger.service'; +import { bodyParser, rawBodyReader } from '@/middlewares'; +import { send } from '@/response-helper'; +import { TaskRunnerAuthController } from '@/runners/auth/task-runner-auth.controller'; +import type { + TaskRunnerServerInitRequest, + TaskRunnerServerInitResponse, +} from '@/runners/runner-types'; +import { TaskRunnerService } from '@/runners/runner-ws-server'; + +/** + * Task Runner HTTP & WS server + */ +@Service() +export class TaskRunnerServer { + private server: Server | undefined; + + private wsServer: WSServer | undefined; + + readonly app: express.Application; + + public get port() { + return (this.server?.address() as AddressInfo)?.port; + } + + private get upgradeEndpoint() { + return `${this.getEndpointBasePath()}/_ws`; + } + + constructor( + private readonly logger: Logger, + private readonly globalConfig: GlobalConfig, + private readonly taskRunnerAuthController: TaskRunnerAuthController, + private readonly taskRunnerService: TaskRunnerService, + ) { + this.app = express(); + this.app.disable('x-powered-by'); + + if (!this.globalConfig.taskRunners.authToken) { + // Generate an auth token if one is not set + this.globalConfig.taskRunners.authToken = randomBytes(32).toString('hex'); + } + } + + async start(): Promise { + await this.setupHttpServer(); + + this.setupWsServer(); + + if (!inTest) { + await this.setupErrorHandlers(); + } + + this.setupCommonMiddlewares(); + + this.configureRoutes(); + } + + @OnShutdown(LOWEST_SHUTDOWN_PRIORITY) + async stop(): Promise { + if (this.wsServer) { + this.wsServer.close(); + this.wsServer = undefined; + } + if (this.server) { + await new Promise((resolve) => this.server?.close(() => resolve())); + this.server = undefined; + } + } + + /** Creates an HTTP server and listens to the configured port */ + private async setupHttpServer() { + const { app } = this; + + this.server = createHttpServer(app); + + const { + taskRunners: { port, listen_address: address }, + } = this.globalConfig; + + this.server.on('error', (error: Error & { code: string }) => { + if (error.code === 'EADDRINUSE') { + this.logger.info( + `n8n Task Runner's port ${port} is already in use. Do you have another instance of n8n running already?`, + ); + process.exit(1); + } + }); + + await new Promise((resolve) => { + a.ok(this.server); + this.server.listen(port, address, () => resolve()); + }); + + this.logger.info(`n8n Task Runner server ready on ${address}, port ${port}`); + } + + /** Creates WebSocket server for handling upgrade requests */ + private setupWsServer() { + const { authToken } = this.globalConfig.taskRunners; + a.ok(authToken); + a.ok(this.server); + + this.wsServer = new WSServer({ noServer: true }); + this.server.on('upgrade', this.handleUpgradeRequest); + } + + private async setupErrorHandlers() { + const { app } = this; + + // Augment errors sent to Sentry + const { + Handlers: { requestHandler, errorHandler }, + } = await import('@sentry/node'); + app.use(requestHandler()); + app.use(errorHandler()); + } + + private setupCommonMiddlewares() { + // Compress the response data + this.app.use(compression()); + + this.app.use(rawBodyReader); + this.app.use(bodyParser); + } + + private configureRoutes() { + this.app.use( + this.upgradeEndpoint, + // eslint-disable-next-line @typescript-eslint/unbound-method + this.taskRunnerAuthController.authMiddleware, + (req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) => + this.taskRunnerService.handleRequest(req, res), + ); + + const authEndpoint = `${this.getEndpointBasePath()}/auth`; + this.app.post( + authEndpoint, + send(async (req) => await this.taskRunnerAuthController.createGrantToken(req)), + ); + } + + private handleUpgradeRequest = ( + request: TaskRunnerServerInitRequest, + socket: Socket, + head: Buffer, + ) => { + if (parseUrl(request.url).pathname !== this.upgradeEndpoint) { + socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); + socket.destroy(); + return; + } + + if (!this.wsServer) { + // This might happen if the server is shutting down and we receive an upgrade request + socket.write('HTTP/1.1 503 Service Unavailable\r\n\r\n'); + socket.destroy(); + return; + } + + this.wsServer.handleUpgrade(request, socket, head, (ws) => { + request.ws = ws; + + const response = new ServerResponse(request); + response.writeHead = (statusCode) => { + if (statusCode > 200) ws.close(100); + return response; + }; + + // @ts-expect-error Delegate the request to the express app. This function is not exposed + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + this.app.handle(request, response); + }); + }; + + /** Returns the normalized base path for the task runner endpoints */ + private getEndpointBasePath() { + let path = this.globalConfig.taskRunners.path; + if (!path.startsWith('/')) { + path = `/${path}`; + } + if (path.endsWith('/')) { + path = path.slice(-1); + } + + return path; + } +} diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 24f467fc5a..b83e2bdb2a 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -31,7 +31,6 @@ import { isApiEnabled, loadPublicApiVersions } from '@/public-api'; import { setupPushServer, setupPushHandler, Push } from '@/push'; import type { APIRequest } from '@/requests'; import * as ResponseHelper from '@/response-helper'; -import { setupRunnerServer, setupRunnerHandler } from '@/runners/runner-ws-server'; import type { FrontendService } from '@/services/frontend.service'; import { OrchestrationService } from '@/services/orchestration.service'; @@ -202,10 +201,6 @@ export class Server extends AbstractServer { const { restEndpoint, app } = this; setupPushHandler(restEndpoint, app); - if (!this.globalConfig.taskRunners.disabled) { - setupRunnerHandler(restEndpoint, app); - } - const push = Container.get(Push); if (push.isBidirectional) { const { CollaborationService } = await import('@/collaboration/collaboration.service'); @@ -405,9 +400,4 @@ export class Server extends AbstractServer { const { restEndpoint, server, app } = this; setupPushServer(restEndpoint, server, app); } - - protected setupRunnerServer(): void { - const { restEndpoint, server, app } = this; - setupRunnerServer(restEndpoint, server, app); - } } diff --git a/packages/cli/test/integration/runners/task-runner-process.test.ts b/packages/cli/test/integration/runners/task-runner-process.test.ts index 57e3c1d480..f517ee6398 100644 --- a/packages/cli/test/integration/runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/runners/task-runner-process.test.ts @@ -4,20 +4,30 @@ import Container from 'typedi'; import { TaskRunnerService } from '@/runners/runner-ws-server'; import { TaskBroker } from '@/runners/task-broker.service'; import { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { TaskRunnerServer } from '@/runners/task-runner-server'; import { retryUntil } from '@test-integration/retry-until'; -import { setupTaskRunnerTestServer } from '@test-integration/utils/task-runner-test-server'; describe('TaskRunnerProcess', () => { const authToken = 'token'; const globalConfig = Container.get(GlobalConfig); globalConfig.taskRunners.authToken = authToken; - const testServer = setupTaskRunnerTestServer({}); - globalConfig.port = testServer.port; + globalConfig.taskRunners.port = 0; // Use any port + const taskRunnerServer = Container.get(TaskRunnerServer); const runnerProcess = Container.get(TaskRunnerProcess); const taskBroker = Container.get(TaskBroker); const taskRunnerService = Container.get(TaskRunnerService); + beforeAll(async () => { + await taskRunnerServer.start(); + // Set the port to the actually used port + globalConfig.taskRunners.port = taskRunnerServer.port; + }); + + afterAll(async () => { + await taskRunnerServer.stop(); + }); + afterEach(async () => { await runnerProcess.stop(); }); diff --git a/packages/cli/test/integration/shared/utils/task-runner-test-server.ts b/packages/cli/test/integration/shared/utils/task-runner-test-server.ts deleted file mode 100644 index 3eba046765..0000000000 --- a/packages/cli/test/integration/shared/utils/task-runner-test-server.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { GlobalConfig } from '@n8n/config'; -import cookieParser from 'cookie-parser'; -import type { Application } from 'express'; -import express from 'express'; -import type { Server } from 'node:http'; -import type { AddressInfo } from 'node:net'; -import Container from 'typedi'; - -import { rawBodyReader } from '@/middlewares'; -import { setupRunnerHandler, setupRunnerServer } from '@/runners/runner-ws-server'; - -export interface TaskRunnerTestServer { - app: Application; - httpServer: Server; - port: number; -} - -/** - * Sets up a task runner HTTP & WS server for testing purposes - */ -export const setupTaskRunnerTestServer = ({}): TaskRunnerTestServer => { - const app = express(); - app.use(rawBodyReader); - app.use(cookieParser()); - - const testServer: TaskRunnerTestServer = { - app, - httpServer: app.listen(0), - port: 0, - }; - - testServer.port = (testServer.httpServer.address() as AddressInfo).port; - - const globalConfig = Container.get(GlobalConfig); - setupRunnerServer(globalConfig.endpoints.rest, testServer.httpServer, testServer.app); - setupRunnerHandler(globalConfig.endpoints.rest, testServer.app); - - afterAll(async () => { - testServer.httpServer.close(); - }); - - return testServer; -};