From f414e90993ba2c5ae2c5ea82427f6071062d9b09 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 23 Oct 2024 14:34:08 +0300 Subject: [PATCH] refactor: Clean task runner code (#11368) --- .../__tests__/js-task-runner.test.ts | 19 ++++ packages/cli/src/runners/runner-ws-server.ts | 90 +------------------ 2 files changed, 20 insertions(+), 89 deletions(-) diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index d555412c59..5f83b23ae1 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -189,6 +189,25 @@ describe('JsTaskRunner', () => { ['{ wf: $workflow }', { wf: { active: true, id: '1', name: 'Test Workflow' } }], ['$vars', { var: 'value' }], ], + 'Node.js internal functions': [ + ['typeof Function', 'function'], + ['typeof eval', 'function'], + ['typeof setTimeout', 'function'], + ['typeof setInterval', 'function'], + ['typeof setImmediate', 'function'], + ['typeof clearTimeout', 'function'], + ['typeof clearInterval', 'function'], + ['typeof clearImmediate', 'function'], + ], + 'JS built-ins': [ + ['typeof btoa', 'function'], + ['typeof atob', 'function'], + ['typeof TextDecoder', 'function'], + ['typeof TextDecoderStream', 'function'], + ['typeof TextEncoder', 'function'], + ['typeof TextEncoderStream', 'function'], + ['typeof FormData', 'function'], + ], }; for (const [groupName, tests] of Object.entries(testGroups)) { diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index 61538f77a0..38b70c97dc 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -1,16 +1,7 @@ -import { GlobalConfig } from '@n8n/config'; -import type { Application } from 'express'; -import { ServerResponse, type Server } from 'http'; -import { ApplicationError } from 'n8n-workflow'; -import type { Socket } from 'net'; -import Container, { Service } from 'typedi'; -import { parse as parseUrl } from 'url'; -import { Server as WSServer } from 'ws'; +import { Service } from 'typedi'; import type WebSocket from 'ws'; import { Logger } from '@/logging/logger.service'; -import { send } from '@/response-helper'; -import { TaskRunnerAuthController } from '@/runners/auth/task-runner-auth.controller'; import type { RunnerMessage, @@ -24,24 +15,6 @@ function heartbeat(this: WebSocket) { this.isAlive = true; } -function getEndpointBasePath(restEndpoint: string) { - const globalConfig = Container.get(GlobalConfig); - - let path = globalConfig.taskRunners.path; - if (path.startsWith('/')) { - path = path.slice(1); - } - if (path.endsWith('/')) { - path = path.slice(-1); - } - - return `/${restEndpoint}/${path}`; -} - -function getWsEndpoint(restEndpoint: string) { - return `${getEndpointBasePath(restEndpoint)}/_ws`; -} - @Service() export class TaskRunnerService { runnerConnections: Map = new Map(); @@ -127,64 +100,3 @@ export class TaskRunnerService { this.add(req.query.id, req.ws); } } - -// Checks for upgrade requests on the runners path and upgrades the connection -// then, passes the request back to the app to handle the routing -export const setupRunnerServer = (restEndpoint: string, server: Server, app: Application) => { - const globalConfig = Container.get(GlobalConfig); - const { authToken } = globalConfig.taskRunners; - - if (!authToken) { - throw new ApplicationError( - 'Authentication token must be configured when task runners are enabled. Use N8N_RUNNERS_AUTH_TOKEN environment variable to set it.', - ); - } - - const endpoint = getWsEndpoint(restEndpoint); - const wsServer = new WSServer({ - noServer: true, - maxPayload: globalConfig.taskRunners.maxPayload, - }); - server.on('upgrade', (request: TaskRunnerServerInitRequest, socket: Socket, head) => { - if (parseUrl(request.url).pathname !== endpoint) { - // We can't close the connection here since the Push connections - // are using the same HTTP server and upgrade requests and this - // gets triggered for both - return; - } - - wsServer.handleUpgrade(request, socket, head, (ws) => { - request.ws = ws; - - const response = new ServerResponse(request); - response.writeHead = (statusCode) => { - if (statusCode > 200) ws.close(); - return response; - }; - - // @ts-expect-error Hidden API? - // eslint-disable-next-line @typescript-eslint/no-unsafe-call - app.handle(request, response); - }); - }); -}; - -export const setupRunnerHandler = (restEndpoint: string, app: Application) => { - const wsEndpoint = getWsEndpoint(restEndpoint); - const authEndpoint = `${getEndpointBasePath(restEndpoint)}/auth`; - - const taskRunnerAuthController = Container.get(TaskRunnerAuthController); - const taskRunnerService = Container.get(TaskRunnerService); - app.use( - wsEndpoint, - // eslint-disable-next-line @typescript-eslint/unbound-method - taskRunnerAuthController.authMiddleware, - (req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) => - taskRunnerService.handleRequest(req, res), - ); - - app.post( - authEndpoint, - send(async (req) => await taskRunnerAuthController.createGrantToken(req)), - ); -};