refactor: Clean task runner code (#11368)

This commit is contained in:
Tomi Turtiainen 2024-10-23 14:34:08 +03:00 committed by GitHub
parent 74fc3889b9
commit f414e90993
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 20 additions and 89 deletions

View file

@ -189,6 +189,25 @@ describe('JsTaskRunner', () => {
['{ wf: $workflow }', { wf: { active: true, id: '1', name: 'Test Workflow' } }],
['$vars', { var: 'value' }],
],
'Node.js internal functions': [
['typeof Function', 'function'],
['typeof eval', 'function'],
['typeof setTimeout', 'function'],
['typeof setInterval', 'function'],
['typeof setImmediate', 'function'],
['typeof clearTimeout', 'function'],
['typeof clearInterval', 'function'],
['typeof clearImmediate', 'function'],
],
'JS built-ins': [
['typeof btoa', 'function'],
['typeof atob', 'function'],
['typeof TextDecoder', 'function'],
['typeof TextDecoderStream', 'function'],
['typeof TextEncoder', 'function'],
['typeof TextEncoderStream', 'function'],
['typeof FormData', 'function'],
],
};
for (const [groupName, tests] of Object.entries(testGroups)) {

View file

@ -1,16 +1,7 @@
import { GlobalConfig } from '@n8n/config';
import type { Application } from 'express';
import { ServerResponse, type Server } from 'http';
import { ApplicationError } from 'n8n-workflow';
import type { Socket } from 'net';
import Container, { Service } from 'typedi';
import { parse as parseUrl } from 'url';
import { Server as WSServer } from 'ws';
import { Service } from 'typedi';
import type WebSocket from 'ws';
import { Logger } from '@/logging/logger.service';
import { send } from '@/response-helper';
import { TaskRunnerAuthController } from '@/runners/auth/task-runner-auth.controller';
import type {
RunnerMessage,
@ -24,24 +15,6 @@ function heartbeat(this: WebSocket) {
this.isAlive = true;
}
function getEndpointBasePath(restEndpoint: string) {
const globalConfig = Container.get(GlobalConfig);
let path = globalConfig.taskRunners.path;
if (path.startsWith('/')) {
path = path.slice(1);
}
if (path.endsWith('/')) {
path = path.slice(-1);
}
return `/${restEndpoint}/${path}`;
}
function getWsEndpoint(restEndpoint: string) {
return `${getEndpointBasePath(restEndpoint)}/_ws`;
}
@Service()
export class TaskRunnerService {
runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();
@ -127,64 +100,3 @@ export class TaskRunnerService {
this.add(req.query.id, req.ws);
}
}
// Checks for upgrade requests on the runners path and upgrades the connection
// then, passes the request back to the app to handle the routing
export const setupRunnerServer = (restEndpoint: string, server: Server, app: Application) => {
const globalConfig = Container.get(GlobalConfig);
const { authToken } = globalConfig.taskRunners;
if (!authToken) {
throw new ApplicationError(
'Authentication token must be configured when task runners are enabled. Use N8N_RUNNERS_AUTH_TOKEN environment variable to set it.',
);
}
const endpoint = getWsEndpoint(restEndpoint);
const wsServer = new WSServer({
noServer: true,
maxPayload: globalConfig.taskRunners.maxPayload,
});
server.on('upgrade', (request: TaskRunnerServerInitRequest, socket: Socket, head) => {
if (parseUrl(request.url).pathname !== endpoint) {
// We can't close the connection here since the Push connections
// are using the same HTTP server and upgrade requests and this
// gets triggered for both
return;
}
wsServer.handleUpgrade(request, socket, head, (ws) => {
request.ws = ws;
const response = new ServerResponse(request);
response.writeHead = (statusCode) => {
if (statusCode > 200) ws.close();
return response;
};
// @ts-expect-error Hidden API?
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
app.handle(request, response);
});
});
};
export const setupRunnerHandler = (restEndpoint: string, app: Application) => {
const wsEndpoint = getWsEndpoint(restEndpoint);
const authEndpoint = `${getEndpointBasePath(restEndpoint)}/auth`;
const taskRunnerAuthController = Container.get(TaskRunnerAuthController);
const taskRunnerService = Container.get(TaskRunnerService);
app.use(
wsEndpoint,
// eslint-disable-next-line @typescript-eslint/unbound-method
taskRunnerAuthController.authMiddleware,
(req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) =>
taskRunnerService.handleRequest(req, res),
);
app.post(
authEndpoint,
send(async (req) => await taskRunnerAuthController.createGrantToken(req)),
);
};