refactor(core): Make task broker an explicit component of task runner (#12782)

This commit is contained in:
Tomi Turtiainen 2025-01-31 17:39:06 +02:00 committed by GitHub
parent c7a15d5980
commit b77bf86166
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 185 additions and 143 deletions

View file

@ -2,15 +2,15 @@ import { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { Logger } from 'n8n-core'; import type { Logger } from 'n8n-core';
import type { TaskRunnerAuthService } from '@/task-runners/auth/task-runner-auth.service';
import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service';
import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events'; import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
describe('TaskRunnerProcessRestartLoopDetector', () => { describe('TaskRunnerProcessRestartLoopDetector', () => {
const mockLogger = mock<Logger>(); const mockLogger = mock<Logger>();
const mockAuthService = mock<TaskRunnerAuthService>(); const mockAuthService = mock<TaskBrokerAuthService>();
const runnerConfig = new TaskRunnersConfig(); const runnerConfig = new TaskRunnersConfig();
const taskRunnerProcess = new TaskRunnerProcess( const taskRunnerProcess = new TaskRunnerProcess(
mockLogger, mockLogger,

View file

@ -3,7 +3,7 @@ import { mock } from 'jest-mock-extended';
import { Logger } from 'n8n-core'; import { Logger } from 'n8n-core';
import type { ChildProcess, SpawnOptions } from 'node:child_process'; import type { ChildProcess, SpawnOptions } from 'node:child_process';
import type { TaskRunnerAuthService } from '@/task-runners/auth/task-runner-auth.service'; import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service';
import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
@ -26,7 +26,7 @@ describe('TaskRunnerProcess', () => {
const runnerConfig = mockInstance(TaskRunnersConfig); const runnerConfig = mockInstance(TaskRunnersConfig);
runnerConfig.enabled = true; runnerConfig.enabled = true;
runnerConfig.mode = 'internal'; runnerConfig.mode = 'internal';
const authService = mock<TaskRunnerAuthService>(); const authService = mock<TaskBrokerAuthService>();
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
afterEach(async () => { afterEach(async () => {

View file

@ -1,5 +0,0 @@
import { z } from 'zod';
export const taskRunnerAuthRequestBodySchema = z.object({
token: z.string().min(1),
});

View file

@ -1,10 +1,13 @@
import { Service } from '@n8n/di'; import { Service } from '@n8n/di';
import config from '@/config'; import config from '@/config';
import type {
DisconnectAnalyzer,
DisconnectErrorOptions,
} from '@/task-runners/task-broker/task-broker-types';
import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error'; import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error';
import type { DisconnectAnalyzer, DisconnectErrorOptions } from './task-runner-types';
/** /**
* Analyzes the disconnect reason of a task runner to provide a more * Analyzes the disconnect reason of a task runner to provide a more

View file

@ -1,6 +1,6 @@
import { ApplicationError } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow';
import type { TaskRunner } from '../task-broker.service'; import type { TaskRunner } from '@/task-runners/task-broker/task-broker.service';
export class TaskRunnerOomError extends ApplicationError { export class TaskRunnerOomError extends ApplicationError {
description: string; description: string;

View file

@ -1,12 +1,13 @@
import { TaskRunnersConfig } from '@n8n/config'; import { TaskRunnersConfig } from '@n8n/config';
import { Service } from '@n8n/di'; import { Service } from '@n8n/di';
import type { DisconnectErrorOptions } from '@/task-runners/task-broker/task-broker-types';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import { TaskRunnerOomError } from './errors/task-runner-oom-error'; import { TaskRunnerOomError } from './errors/task-runner-oom-error';
import { SlidingWindowSignal } from './sliding-window-signal'; import { SlidingWindowSignal } from './sliding-window-signal';
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process'; import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process';
import { TaskRunnerProcess } from './task-runner-process'; import { TaskRunnerProcess } from './task-runner-process';
import type { DisconnectErrorOptions } from './task-runner-types';
/** /**
* Analyzes the disconnect reason of a task runner process to provide a more * Analyzes the disconnect reason of a task runner process to provide a more

View file

@ -3,24 +3,23 @@ import { mock } from 'jest-mock-extended';
import { ServerResponse } from 'node:http'; import { ServerResponse } from 'node:http';
import type WebSocket from 'ws'; import type WebSocket from 'ws';
import type { TaskRunnerAuthController } from '@/task-runners/auth/task-runner-auth.controller'; import type { TaskBrokerAuthController } from '@/task-runners/task-broker/auth/task-broker-auth.controller';
import { TaskRunnerServer } from '@/task-runners/task-runner-server'; import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server';
import type { TaskBrokerServerInitRequest } from '@/task-runners/task-broker/task-broker-types';
import type { TaskRunnerServerInitRequest } from '../task-runner-types'; describe('TaskBrokerServer', () => {
describe('TaskRunnerServer', () => {
describe('handleUpgradeRequest', () => { describe('handleUpgradeRequest', () => {
it('should close WebSocket when response status code is > 200', () => { it('should close WebSocket when response status code is > 200', () => {
const ws = mock<WebSocket>(); const ws = mock<WebSocket>();
const request = mock<TaskRunnerServerInitRequest>({ const request = mock<TaskBrokerServerInitRequest>({
url: '/runners/_ws', url: '/runners/_ws',
ws, ws,
}); });
const server = new TaskRunnerServer( const server = new TaskBrokerServer(
mock(), mock(),
mock<GlobalConfig>({ taskRunners: { path: '/runners' } }), mock<GlobalConfig>({ taskRunners: { path: '/runners' } }),
mock<TaskRunnerAuthController>(), mock<TaskBrokerAuthController>(),
mock(), mock(),
); );
@ -39,15 +38,15 @@ describe('TaskRunnerServer', () => {
it('should not close WebSocket when response status code is 200', () => { it('should not close WebSocket when response status code is 200', () => {
const ws = mock<WebSocket>(); const ws = mock<WebSocket>();
const request = mock<TaskRunnerServerInitRequest>({ const request = mock<TaskBrokerServerInitRequest>({
url: '/runners/_ws', url: '/runners/_ws',
ws, ws,
}); });
const server = new TaskRunnerServer( const server = new TaskBrokerServer(
mock(), mock(),
mock<GlobalConfig>({ taskRunners: { path: '/runners' } }), mock<GlobalConfig>({ taskRunners: { path: '/runners' } }),
mock<TaskRunnerAuthController>(), mock<TaskBrokerAuthController>(),
mock(), mock(),
); );

View file

@ -3,12 +3,12 @@ import { mock } from 'jest-mock-extended';
import type WebSocket from 'ws'; import type WebSocket from 'ws';
import { Time, WsStatusCodes } from '@/constants'; import { Time, WsStatusCodes } from '@/constants';
import { TaskRunnerWsServer } from '@/task-runners/task-runner-ws-server'; import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
describe('TaskRunnerWsServer', () => { describe('TaskBrokerWsServer', () => {
describe('removeConnection', () => { describe('removeConnection', () => {
it('should close with 1000 status code by default', async () => { it('should close with 1000 status code by default', async () => {
const server = new TaskRunnerWsServer(mock(), mock(), mock(), mock(), mock()); const server = new TaskBrokerWsServer(mock(), mock(), mock(), mock(), mock());
const ws = mock<WebSocket>(); const ws = mock<WebSocket>();
server.runnerConnections.set('test-runner', ws); server.runnerConnections.set('test-runner', ws);
@ -22,7 +22,7 @@ describe('TaskRunnerWsServer', () => {
it('should set up heartbeat timer on server start', async () => { it('should set up heartbeat timer on server start', async () => {
const setIntervalSpy = jest.spyOn(global, 'setInterval'); const setIntervalSpy = jest.spyOn(global, 'setInterval');
const server = new TaskRunnerWsServer( const server = new TaskBrokerWsServer(
mock(), mock(),
mock(), mock(),
mock(), mock(),
@ -44,7 +44,7 @@ describe('TaskRunnerWsServer', () => {
jest.spyOn(global, 'setInterval'); jest.spyOn(global, 'setInterval');
const clearIntervalSpy = jest.spyOn(global, 'clearInterval'); const clearIntervalSpy = jest.spyOn(global, 'clearInterval');
const server = new TaskRunnerWsServer( const server = new TaskBrokerWsServer(
mock(), mock(),
mock(), mock(),
mock(), mock(),
@ -61,7 +61,7 @@ describe('TaskRunnerWsServer', () => {
describe('sendMessage', () => { describe('sendMessage', () => {
it('should work with a message containing circular references', () => { it('should work with a message containing circular references', () => {
const server = new TaskRunnerWsServer(mock(), mock(), mock(), mock(), mock()); const server = new TaskBrokerWsServer(mock(), mock(), mock(), mock(), mock());
const ws = mock<WebSocket>(); const ws = mock<WebSocket>();
server.runnerConnections.set('test-runner', ws); server.runnerConnections.set('test-runner', ws);

View file

@ -4,12 +4,12 @@ import { mock } from 'jest-mock-extended';
import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow'; import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow';
import { Time } from '@/constants'; import { Time } from '@/constants';
import type { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
import { TaskRejectError } from '../errors'; import { TaskRejectError } from '../errors/task-reject.error';
import { TaskRunnerTimeoutError } from '../errors/task-runner-timeout.error'; import { TaskRunnerTimeoutError } from '../errors/task-runner-timeout.error';
import { TaskBroker } from '../task-broker.service'; import { TaskBroker } from '../task-broker.service';
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service'; import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
import type { TaskRunnerLifecycleEvents } from '../task-runner-lifecycle-events';
const createValidUntil = (ms: number) => process.hrtime.bigint() + BigInt(ms * 1_000_000); const createValidUntil = (ms: number) => process.hrtime.bigint() + BigInt(ms * 1_000_000);

View file

@ -5,14 +5,14 @@ import { mock } from 'jest-mock-extended';
import { CacheService } from '@/services/cache/cache.service'; import { CacheService } from '@/services/cache/cache.service';
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
import { BadRequestError } from '../../../errors/response-errors/bad-request.error'; import { BadRequestError } from '../../../../errors/response-errors/bad-request.error';
import { ForbiddenError } from '../../../errors/response-errors/forbidden.error'; import { ForbiddenError } from '../../../../errors/response-errors/forbidden.error';
import type { AuthlessRequest } from '../../../requests'; import type { AuthlessRequest } from '../../../../requests';
import type { TaskRunnerServerInitRequest } from '../../task-runner-types'; import type { TaskBrokerServerInitRequest } from '../../task-broker-types';
import { TaskRunnerAuthController } from '../task-runner-auth.controller'; import { TaskBrokerAuthController } from '../task-broker-auth.controller';
import { TaskRunnerAuthService } from '../task-runner-auth.service'; import { TaskBrokerAuthService } from '../task-broker-auth.service';
describe('TaskRunnerAuthController', () => { describe('TaskBrokerAuthController', () => {
const globalConfig = mockInstance(GlobalConfig, { const globalConfig = mockInstance(GlobalConfig, {
cache: { cache: {
backend: 'memory', backend: 'memory',
@ -27,8 +27,8 @@ describe('TaskRunnerAuthController', () => {
}); });
const TTL = 100; const TTL = 100;
const cacheService = new CacheService(globalConfig); const cacheService = new CacheService(globalConfig);
const authService = new TaskRunnerAuthService(globalConfig, cacheService, TTL); const authService = new TaskBrokerAuthService(globalConfig, cacheService, TTL);
const authController = new TaskRunnerAuthController(authService); const authController = new TaskBrokerAuthController(authService);
const createMockGrantTokenReq = (token?: string) => const createMockGrantTokenReq = (token?: string) =>
({ ({
@ -71,7 +71,7 @@ describe('TaskRunnerAuthController', () => {
const next = jest.fn() as NextFunction; const next = jest.fn() as NextFunction;
const createMockReqWithToken = (token?: string) => const createMockReqWithToken = (token?: string) =>
mock<TaskRunnerServerInitRequest>({ mock<TaskBrokerServerInitRequest>({
headers: { headers: {
authorization: `Bearer ${token}`, authorization: `Bearer ${token}`,
}, },
@ -82,7 +82,7 @@ describe('TaskRunnerAuthController', () => {
}); });
it('should respond with 401 when grant token is missing', async () => { it('should respond with 401 when grant token is missing', async () => {
const req = mock<TaskRunnerServerInitRequest>({}); const req = mock<TaskBrokerServerInitRequest>({});
await authController.authMiddleware(req, res, next); await authController.authMiddleware(req, res, next);

View file

@ -5,10 +5,10 @@ import config from '@/config';
import { CacheService } from '@/services/cache/cache.service'; import { CacheService } from '@/services/cache/cache.service';
import { retryUntil } from '@test-integration/retry-until'; import { retryUntil } from '@test-integration/retry-until';
import { mockInstance } from '../../../../test/shared/mocking'; import { mockInstance } from '../../../../../test/shared/mocking';
import { TaskRunnerAuthService } from '../task-runner-auth.service'; import { TaskBrokerAuthService } from '../task-broker-auth.service';
describe('TaskRunnerAuthService', () => { describe('TaskBrokerAuthService', () => {
config.set('taskRunners.authToken', 'random-secret'); config.set('taskRunners.authToken', 'random-secret');
const globalConfig = mockInstance(GlobalConfig, { const globalConfig = mockInstance(GlobalConfig, {
@ -25,7 +25,7 @@ describe('TaskRunnerAuthService', () => {
}); });
const TTL = 100; const TTL = 100;
const cacheService = new CacheService(globalConfig); const cacheService = new CacheService(globalConfig);
const authService = new TaskRunnerAuthService(globalConfig, cacheService, TTL); const authService = new TaskBrokerAuthService(globalConfig, cacheService, TTL);
beforeEach(() => { beforeEach(() => {
jest.clearAllMocks(); jest.clearAllMocks();

View file

@ -2,19 +2,19 @@ import { Service } from '@n8n/di';
import type { NextFunction, Response } from 'express'; import type { NextFunction, Response } from 'express';
import type { AuthlessRequest } from '@/requests'; import type { AuthlessRequest } from '@/requests';
import type { TaskBrokerServerInitRequest } from '@/task-runners/task-broker/task-broker-types';
import { taskRunnerAuthRequestBodySchema } from './task-runner-auth.schema'; import { taskBrokerAuthRequestBodySchema } from './task-broker-auth.schema';
import { TaskRunnerAuthService } from './task-runner-auth.service'; import { TaskBrokerAuthService } from './task-broker-auth.service';
import { BadRequestError } from '../../errors/response-errors/bad-request.error'; import { BadRequestError } from '../../../errors/response-errors/bad-request.error';
import { ForbiddenError } from '../../errors/response-errors/forbidden.error'; import { ForbiddenError } from '../../../errors/response-errors/forbidden.error';
import type { TaskRunnerServerInitRequest } from '../task-runner-types';
/** /**
* Controller responsible for authenticating Task Runner connections * Controller responsible for authenticating Task Runner connections
*/ */
@Service() @Service()
export class TaskRunnerAuthController { export class TaskBrokerAuthController {
constructor(private readonly taskRunnerAuthService: TaskRunnerAuthService) { constructor(private readonly authService: TaskBrokerAuthService) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.authMiddleware = this.authMiddleware.bind(this); this.authMiddleware = this.authMiddleware.bind(this);
} }
@ -24,17 +24,17 @@ export class TaskRunnerAuthController {
* which can be used to initiate a task runner connection. * which can be used to initiate a task runner connection.
*/ */
async createGrantToken(req: AuthlessRequest) { async createGrantToken(req: AuthlessRequest) {
const result = await taskRunnerAuthRequestBodySchema.safeParseAsync(req.body); const result = await taskBrokerAuthRequestBodySchema.safeParseAsync(req.body);
if (!result.success) { if (!result.success) {
throw new BadRequestError(result.error.errors[0].code); throw new BadRequestError(result.error.errors[0].code);
} }
const { token: authToken } = result.data; const { token: authToken } = result.data;
if (!this.taskRunnerAuthService.isValidAuthToken(authToken)) { if (!this.authService.isValidAuthToken(authToken)) {
throw new ForbiddenError(); throw new ForbiddenError();
} }
const grantToken = await this.taskRunnerAuthService.createGrantToken(); const grantToken = await this.authService.createGrantToken();
return { return {
token: grantToken, token: grantToken,
}; };
@ -43,7 +43,7 @@ export class TaskRunnerAuthController {
/** /**
* Middleware to authenticate task runner init requests * Middleware to authenticate task runner init requests
*/ */
async authMiddleware(req: TaskRunnerServerInitRequest, res: Response, next: NextFunction) { async authMiddleware(req: TaskBrokerServerInitRequest, res: Response, next: NextFunction) {
const authHeader = req.headers.authorization; const authHeader = req.headers.authorization;
if (typeof authHeader !== 'string' || !authHeader.startsWith('Bearer ')) { if (typeof authHeader !== 'string' || !authHeader.startsWith('Bearer ')) {
res.status(401).json({ code: 401, message: 'Unauthorized' }); res.status(401).json({ code: 401, message: 'Unauthorized' });
@ -51,7 +51,7 @@ export class TaskRunnerAuthController {
} }
const grantToken = authHeader.slice('Bearer '.length); const grantToken = authHeader.slice('Bearer '.length);
const isConsumed = await this.taskRunnerAuthService.tryConsumeGrantToken(grantToken); const isConsumed = await this.authService.tryConsumeGrantToken(grantToken);
if (!isConsumed) { if (!isConsumed) {
res.status(403).json({ code: 403, message: 'Forbidden' }); res.status(403).json({ code: 403, message: 'Forbidden' });
return; return;

View file

@ -0,0 +1,5 @@
import { z } from 'zod';
export const taskBrokerAuthRequestBodySchema = z.object({
token: z.string().min(1),
});

View file

@ -8,7 +8,7 @@ import { CacheService } from '@/services/cache/cache.service';
const GRANT_TOKEN_TTL = 15 * Time.seconds.toMilliseconds; const GRANT_TOKEN_TTL = 15 * Time.seconds.toMilliseconds;
@Service() @Service()
export class TaskRunnerAuthService { export class TaskBrokerAuthService {
private readonly authToken = Buffer.from(this.globalConfig.taskRunners.authToken); private readonly authToken = Buffer.from(this.globalConfig.taskRunners.authToken);
constructor( constructor(

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class TaskDeferredError extends ApplicationError {
constructor() {
super('Task deferred until runner is ready', { level: 'info' });
}
}

View file

@ -5,11 +5,3 @@ export class TaskRejectError extends ApplicationError {
super(`Task rejected with reason: ${reason}`, { level: 'info' }); super(`Task rejected with reason: ${reason}`, { level: 'info' });
} }
} }
export class TaskDeferredError extends ApplicationError {
constructor() {
super('Task deferred until runner is ready', { level: 'info' });
}
}
export class TaskError extends ApplicationError {}

View file

@ -14,18 +14,18 @@ import { Server as WSServer } from 'ws';
import { inTest } from '@/constants'; import { inTest } from '@/constants';
import { bodyParser, rawBodyReader } from '@/middlewares'; import { bodyParser, rawBodyReader } from '@/middlewares';
import { send } from '@/response-helper'; import { send } from '@/response-helper';
import { TaskRunnerAuthController } from '@/task-runners/auth/task-runner-auth.controller'; import { TaskBrokerAuthController } from '@/task-runners/task-broker/auth/task-broker-auth.controller';
import type { import type {
TaskRunnerServerInitRequest, TaskBrokerServerInitRequest,
TaskRunnerServerInitResponse, TaskBrokerServerInitResponse,
} from '@/task-runners/task-runner-types'; } from '@/task-runners/task-broker/task-broker-types';
import { TaskRunnerWsServer } from '@/task-runners/task-runner-ws-server'; import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
/** /**
* Task Runner HTTP & WS server * Task Broker HTTP & WS server
*/ */
@Service() @Service()
export class TaskRunnerServer { export class TaskBrokerServer {
private server: Server | undefined; private server: Server | undefined;
private wsServer: WSServer | undefined; private wsServer: WSServer | undefined;
@ -43,8 +43,8 @@ export class TaskRunnerServer {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly globalConfig: GlobalConfig, private readonly globalConfig: GlobalConfig,
private readonly taskRunnerAuthController: TaskRunnerAuthController, private readonly authController: TaskBrokerAuthController,
private readonly taskRunnerWsServer: TaskRunnerWsServer, private readonly taskBrokerWsServer: TaskBrokerWsServer,
) { ) {
this.app = express(); this.app = express();
this.app.disable('x-powered-by'); this.app.disable('x-powered-by');
@ -82,7 +82,7 @@ export class TaskRunnerServer {
} }
})(); })();
const stopWsServerTask = this.taskRunnerWsServer.stop(); const stopWsServerTask = this.taskBrokerWsServer.stop();
await Promise.all([stopHttpServerTask, stopWsServerTask]); await Promise.all([stopHttpServerTask, stopWsServerTask]);
} }
@ -126,7 +126,7 @@ export class TaskRunnerServer {
}); });
this.server.on('upgrade', this.handleUpgradeRequest); this.server.on('upgrade', this.handleUpgradeRequest);
this.taskRunnerWsServer.start(); this.taskBrokerWsServer.start();
} }
private async setupErrorHandlers() { private async setupErrorHandlers() {
@ -159,23 +159,23 @@ export class TaskRunnerServer {
this.upgradeEndpoint, this.upgradeEndpoint,
createRateLimiter(), createRateLimiter(),
// eslint-disable-next-line @typescript-eslint/unbound-method // eslint-disable-next-line @typescript-eslint/unbound-method
this.taskRunnerAuthController.authMiddleware, this.authController.authMiddleware,
(req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) => (req: TaskBrokerServerInitRequest, res: TaskBrokerServerInitResponse) =>
this.taskRunnerWsServer.handleRequest(req, res), this.taskBrokerWsServer.handleRequest(req, res),
); );
const authEndpoint = `${this.getEndpointBasePath()}/auth`; const authEndpoint = `${this.getEndpointBasePath()}/auth`;
this.app.post( this.app.post(
authEndpoint, authEndpoint,
createRateLimiter(), createRateLimiter(),
send(async (req) => await this.taskRunnerAuthController.createGrantToken(req)), send(async (req) => await this.authController.createGrantToken(req)),
); );
this.app.get('/healthz', (_, res) => res.send({ status: 'ok' })); this.app.get('/healthz', (_, res) => res.send({ status: 'ok' }));
} }
private handleUpgradeRequest = ( private handleUpgradeRequest = (
request: TaskRunnerServerInitRequest, request: TaskBrokerServerInitRequest,
socket: Socket, socket: Socket,
head: Buffer, head: Buffer,
) => { ) => {

View file

@ -1,9 +1,8 @@
import type { TaskRunner } from '@n8n/task-runner';
import type { Response } from 'express'; import type { Response } from 'express';
import type { INodeExecutionData } from 'n8n-workflow';
import type WebSocket from 'ws'; import type WebSocket from 'ws';
import type { TaskRunner } from './task-broker.service'; import type { AuthlessRequest } from '../../requests';
import type { AuthlessRequest } from '../requests';
export interface DisconnectAnalyzer { export interface DisconnectAnalyzer {
isCloudDeployment: boolean; isCloudDeployment: boolean;
@ -11,19 +10,12 @@ export interface DisconnectAnalyzer {
toDisconnectError(opts: DisconnectErrorOptions): Promise<Error>; toDisconnectError(opts: DisconnectErrorOptions): Promise<Error>;
} }
export type DataRequestType = 'input' | 'node' | 'all'; export interface TaskBrokerServerInitRequest
export interface TaskResultData {
result: INodeExecutionData[];
customData?: Record<string, string>;
}
export interface TaskRunnerServerInitRequest
extends AuthlessRequest<{}, {}, {}, { id: TaskRunner['id']; token?: string }> { extends AuthlessRequest<{}, {}, {}, { id: TaskRunner['id']; token?: string }> {
ws: WebSocket; ws: WebSocket;
} }
export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest }; export type TaskBrokerServerInitResponse = Response & { req: TaskBrokerServerInitRequest };
export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown'; export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown';

View file

@ -6,16 +6,16 @@ import { ApplicationError, jsonStringify } from 'n8n-workflow';
import type WebSocket from 'ws'; import type WebSocket from 'ws';
import { Time, WsStatusCodes } from '@/constants'; import { Time, WsStatusCodes } from '@/constants';
import { DefaultTaskRunnerDisconnectAnalyzer } from '@/task-runners/default-task-runner-disconnect-analyzer';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service';
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
import type { import type {
DisconnectAnalyzer, DisconnectAnalyzer,
DisconnectReason, DisconnectReason,
TaskRunnerServerInitRequest, TaskBrokerServerInitRequest,
TaskRunnerServerInitResponse, TaskBrokerServerInitResponse,
} from './task-runner-types'; } from '@/task-runners/task-broker/task-broker-types';
import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service';
function heartbeat(this: WebSocket) { function heartbeat(this: WebSocket) {
this.isAlive = true; this.isAlive = true;
@ -23,8 +23,12 @@ function heartbeat(this: WebSocket) {
type WsStatusCode = (typeof WsStatusCodes)[keyof typeof WsStatusCodes]; type WsStatusCode = (typeof WsStatusCodes)[keyof typeof WsStatusCodes];
/**
* Responsible for handling WebSocket connections with task runners
* and monitoring the connection liveness
*/
@Service() @Service()
export class TaskRunnerWsServer { export class TaskBrokerWsServer {
runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map(); runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();
private heartbeatTimer: NodeJS.Timer | undefined; private heartbeatTimer: NodeJS.Timer | undefined;
@ -164,7 +168,7 @@ export class TaskRunnerWsServer {
} }
} }
handleRequest(req: TaskRunnerServerInitRequest, _res: TaskRunnerServerInitResponse) { handleRequest(req: TaskBrokerServerInitRequest, _res: TaskBrokerServerInitResponse) {
this.add(req.query.id, req.ws); this.add(req.query.id, req.ws);
} }

View file

@ -12,10 +12,10 @@ import { nanoid } from 'nanoid';
import config from '@/config'; import config from '@/config';
import { Time } from '@/constants'; import { Time } from '@/constants';
import { TaskDeferredError } from '@/task-runners/task-broker/errors/task-deferred.error';
import { TaskDeferredError, TaskRejectError } from './errors'; import { TaskRejectError } from '@/task-runners/task-broker/errors/task-reject.error';
import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error'; import { TaskRunnerTimeoutError } from '@/task-runners/task-broker/errors/task-runner-timeout.error';
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
export interface TaskRunner { export interface TaskRunner {
id: string; id: string;

View file

@ -2,10 +2,10 @@ import { Container, Service } from '@n8n/di';
import type { RequesterMessage } from '@n8n/task-runner'; import type { RequesterMessage } from '@n8n/task-runner';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
import type { RequesterMessageCallback } from '@/task-runners/task-broker/task-broker.service';
import { TaskBroker } from '@/task-runners/task-broker/task-broker.service';
import { TaskRequester } from './task-requester'; import { TaskRequester } from './task-requester';
import type { RequesterMessageCallback } from '../task-broker.service';
import { TaskBroker } from '../task-broker.service';
@Service() @Service()
export class LocalTaskRequester extends TaskRequester { export class LocalTaskRequester extends TaskRequester {

View file

@ -6,13 +6,13 @@ import * as a from 'node:assert/strict';
import { OnShutdown } from '@/decorators/on-shutdown'; import { OnShutdown } from '@/decorators/on-shutdown';
import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
import type { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import type { TaskRunnerProcess } from '@/task-runners/task-runner-process';
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
import { MissingAuthTokenError } from './errors/missing-auth-token.error'; import { MissingAuthTokenError } from './errors/missing-auth-token.error';
import type { TaskBrokerServer } from './task-broker/task-broker-server';
import type { LocalTaskRequester } from './task-managers/local-task-requester'; import type { LocalTaskRequester } from './task-managers/local-task-requester';
import type { TaskRunnerServer } from './task-runner-server';
import { TaskRunnerWsServer } from './task-runner-ws-server';
/** /**
* Module responsible for loading and starting task runner. Task runner can be * Module responsible for loading and starting task runner. Task runner can be
@ -21,9 +21,9 @@ import { TaskRunnerWsServer } from './task-runner-ws-server';
*/ */
@Service() @Service()
export class TaskRunnerModule { export class TaskRunnerModule {
private taskRunnerHttpServer: TaskRunnerServer | undefined; private taskBrokerHttpServer: TaskBrokerServer | undefined;
private taskRunnerWsServer: TaskRunnerWsServer | undefined; private taskBrokerWsServer: TaskBrokerWsServer | undefined;
private taskRequester: LocalTaskRequester | undefined; private taskRequester: LocalTaskRequester | undefined;
@ -47,7 +47,7 @@ export class TaskRunnerModule {
if (mode === 'external' && !authToken) throw new MissingAuthTokenError(); if (mode === 'external' && !authToken) throw new MissingAuthTokenError();
await this.loadTaskRequester(); await this.loadTaskRequester();
await this.loadTaskRunnerServer(); await this.loadTaskBroker();
if (mode === 'internal') { if (mode === 'internal') {
await this.startInternalTaskRunner(); await this.startInternalTaskRunner();
@ -64,9 +64,9 @@ export class TaskRunnerModule {
})(); })();
const stopRunnerServerTask = (async () => { const stopRunnerServerTask = (async () => {
if (this.taskRunnerHttpServer) { if (this.taskBrokerHttpServer) {
await this.taskRunnerHttpServer.stop(); await this.taskBrokerHttpServer.stop();
this.taskRunnerHttpServer = undefined; this.taskBrokerHttpServer = undefined;
} }
})(); })();
@ -82,18 +82,18 @@ export class TaskRunnerModule {
Container.set(TaskRequester, this.taskRequester); Container.set(TaskRequester, this.taskRequester);
} }
private async loadTaskRunnerServer() { private async loadTaskBroker() {
// These are imported dynamically because we need to set the task manager // These are imported dynamically because we need to set the task manager
// instance before importing them // instance before importing them
const { TaskRunnerServer } = await import('@/task-runners/task-runner-server'); const { TaskBrokerServer } = await import('@/task-runners/task-broker/task-broker-server');
this.taskRunnerHttpServer = Container.get(TaskRunnerServer); this.taskBrokerHttpServer = Container.get(TaskBrokerServer);
this.taskRunnerWsServer = Container.get(TaskRunnerWsServer); this.taskBrokerWsServer = Container.get(TaskBrokerWsServer);
await this.taskRunnerHttpServer.start(); await this.taskBrokerHttpServer.start();
} }
private async startInternalTaskRunner() { private async startInternalTaskRunner() {
a.ok(this.taskRunnerWsServer, 'Task Runner WS Server not loaded'); a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded');
const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process'); const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process');
this.taskRunnerProcess = Container.get(TaskRunnerProcess); this.taskRunnerProcess = Container.get(TaskRunnerProcess);
@ -110,7 +110,7 @@ export class TaskRunnerModule {
const { InternalTaskRunnerDisconnectAnalyzer } = await import( const { InternalTaskRunnerDisconnectAnalyzer } = await import(
'@/task-runners/internal-task-runner-disconnect-analyzer' '@/task-runners/internal-task-runner-disconnect-analyzer'
); );
this.taskRunnerWsServer.setDisconnectAnalyzer( this.taskBrokerWsServer.setDisconnectAnalyzer(
Container.get(InternalTaskRunnerDisconnectAnalyzer), Container.get(InternalTaskRunnerDisconnectAnalyzer),
); );
} }

View file

@ -7,9 +7,9 @@ import * as process from 'node:process';
import { OnShutdown } from '@/decorators/on-shutdown'; import { OnShutdown } from '@/decorators/on-shutdown';
import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
import { forwardToLogger } from './forward-to-logger'; import { forwardToLogger } from './forward-to-logger';
import { NodeProcessOomDetector } from './node-process-oom-detector'; import { NodeProcessOomDetector } from './node-process-oom-detector';
import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service';
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
import { TypedEmitter } from '../typed-emitter'; import { TypedEmitter } from '../typed-emitter';
@ -68,7 +68,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
constructor( constructor(
logger: Logger, logger: Logger,
private readonly runnerConfig: TaskRunnersConfig, private readonly runnerConfig: TaskRunnersConfig,
private readonly authService: TaskRunnerAuthService, private readonly authService: TaskBrokerAuthService,
private readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, private readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents,
) { ) {
super(); super();

View file

@ -17,8 +17,8 @@ import { Publisher } from '@/scaling/pubsub/publisher.service';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { ScalingService } from '@/scaling/scaling.service'; import { ScalingService } from '@/scaling/scaling.service';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server';
import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
import { TaskRunnerServer } from '@/task-runners/task-runner-server';
import { Telemetry } from '@/telemetry'; import { Telemetry } from '@/telemetry';
import { setupTestCommand } from '@test-integration/utils/test-command'; import { setupTestCommand } from '@test-integration/utils/test-command';
@ -36,7 +36,7 @@ const messageEventBus = mockInstance(MessageEventBus);
const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay);
const scalingService = mockInstance(ScalingService); const scalingService = mockInstance(ScalingService);
const orchestrationService = mockInstance(OrchestrationService); const orchestrationService = mockInstance(OrchestrationService);
const taskRunnerServer = mockInstance(TaskRunnerServer); const taskBrokerServer = mockInstance(TaskBrokerServer);
const taskRunnerProcess = mockInstance(TaskRunnerProcess); const taskRunnerProcess = mockInstance(TaskRunnerProcess);
mockInstance(Publisher); mockInstance(Publisher);
mockInstance(Subscriber); mockInstance(Subscriber);
@ -60,7 +60,7 @@ test('worker initializes all its components', async () => {
expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1);
expect(orchestrationService.init).toHaveBeenCalledTimes(1); expect(orchestrationService.init).toHaveBeenCalledTimes(1);
expect(messageEventBus.send).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1);
expect(taskRunnerServer.start).toHaveBeenCalledTimes(1); expect(taskBrokerServer.start).toHaveBeenCalledTimes(1);
expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1); expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1);
expect(config.getEnv('executions.mode')).toBe('queue'); expect(config.getEnv('executions.mode')).toBe('queue');

View file

@ -3,10 +3,10 @@ import { Container } from '@n8n/di';
import request from 'supertest'; import request from 'supertest';
import type TestAgent from 'supertest/lib/agent'; import type TestAgent from 'supertest/lib/agent';
import { TaskRunnerServer } from '@/task-runners/task-runner-server'; import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server';
export interface TestTaskBrokerServer { export interface TestTaskBrokerServer {
server: TaskRunnerServer; server: TaskBrokerServer;
agent: TestAgent; agent: TestAgent;
config: TaskRunnersConfig; config: TaskRunnersConfig;
} }
@ -29,11 +29,11 @@ export const setupBrokerTestServer = (
runnerConfig.enabled = true; runnerConfig.enabled = true;
runnerConfig.port = 0; // Use any port runnerConfig.port = 0; // Use any port
const taskRunnerServer = Container.get(TaskRunnerServer); const taskBrokerServer = Container.get(TaskBrokerServer);
const agent = request.agent(taskRunnerServer.app); const agent = request.agent(taskBrokerServer.app);
return { return {
server: taskRunnerServer, server: taskBrokerServer,
agent, agent,
config: runnerConfig, config: runnerConfig,
}; };

View file

@ -0,0 +1,44 @@
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
describe('TaskBrokerServer', () => {
const { agent, server } = setupBrokerTestServer({
authToken: 'token',
mode: 'external',
});
beforeAll(async () => {
await server.start();
});
afterAll(async () => {
await server.stop();
});
describe('/healthz', () => {
it('should return 200', async () => {
await agent.get('/healthz').expect(200);
});
});
describe('/runners/_ws', () => {
it('should return 429 when too many requests are made', async () => {
await agent.post('/runners/_ws').send({}).expect(401);
await agent.post('/runners/_ws').send({}).expect(401);
await agent.post('/runners/_ws').send({}).expect(401);
await agent.post('/runners/_ws').send({}).expect(401);
await agent.post('/runners/_ws').send({}).expect(401);
await agent.post('/runners/_ws').send({}).expect(429);
});
});
describe('/runners/auth', () => {
it('should return 429 when too many requests are made', async () => {
await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403);
await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403);
await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403);
await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403);
await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403);
await agent.post('/runners/auth').send({ token: 'invalid' }).expect(429);
});
});
});

View file

@ -3,10 +3,10 @@ import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import { MissingAuthTokenError } from '@/task-runners/errors/missing-auth-token.error'; import { MissingAuthTokenError } from '@/task-runners/errors/missing-auth-token.error';
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
import { TaskRunnerModule } from '@/task-runners/task-runner-module'; import { TaskRunnerModule } from '@/task-runners/task-runner-module';
import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/task-runners/default-task-runner-disconnect-analyzer'; import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/task-runners/default-task-runner-disconnect-analyzer';
import { TaskRunnerWsServer } from '../../../src/task-runners/task-runner-ws-server';
describe('TaskRunnerModule in external mode', () => { describe('TaskRunnerModule in external mode', () => {
const runnerConfig = Container.get(TaskRunnersConfig); const runnerConfig = Container.get(TaskRunnersConfig);
@ -46,7 +46,7 @@ describe('TaskRunnerModule in external mode', () => {
}); });
it('should use DefaultTaskRunnerDisconnectAnalyzer', () => { it('should use DefaultTaskRunnerDisconnectAnalyzer', () => {
const wsServer = Container.get(TaskRunnerWsServer); const wsServer = Container.get(TaskBrokerWsServer);
expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(DefaultTaskRunnerDisconnectAnalyzer); expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(DefaultTaskRunnerDisconnectAnalyzer);
}); });

View file

@ -1,10 +1,10 @@
import { TaskRunnersConfig } from '@n8n/config'; import { TaskRunnersConfig } from '@n8n/config';
import { Container } from '@n8n/di'; import { Container } from '@n8n/di';
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
import { TaskRunnerModule } from '@/task-runners/task-runner-module'; import { TaskRunnerModule } from '@/task-runners/task-runner-module';
import { InternalTaskRunnerDisconnectAnalyzer } from '../../../src/task-runners/internal-task-runner-disconnect-analyzer'; import { InternalTaskRunnerDisconnectAnalyzer } from '../../../src/task-runners/internal-task-runner-disconnect-analyzer';
import { TaskRunnerWsServer } from '../../../src/task-runners/task-runner-ws-server';
describe('TaskRunnerModule in internal mode', () => { describe('TaskRunnerModule in internal mode', () => {
const runnerConfig = Container.get(TaskRunnersConfig); const runnerConfig = Container.get(TaskRunnersConfig);
@ -33,7 +33,7 @@ describe('TaskRunnerModule in internal mode', () => {
}); });
it('should use InternalTaskRunnerDisconnectAnalyzer', () => { it('should use InternalTaskRunnerDisconnectAnalyzer', () => {
const wsServer = Container.get(TaskRunnerWsServer); const wsServer = Container.get(TaskBrokerWsServer);
expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(InternalTaskRunnerDisconnectAnalyzer); expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(InternalTaskRunnerDisconnectAnalyzer);
}); });

View file

@ -1,9 +1,9 @@
import { Container } from '@n8n/di'; import { Container } from '@n8n/di';
import { TaskBroker } from '@/task-runners/task-broker.service'; import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
import { TaskBroker } from '@/task-runners/task-broker/task-broker.service';
import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process';
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
import { TaskRunnerWsServer } from '@/task-runners/task-runner-ws-server';
import { retryUntil } from '@test-integration/retry-until'; import { retryUntil } from '@test-integration/retry-until';
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
@ -13,7 +13,7 @@ describe('TaskRunnerProcess', () => {
}); });
const runnerProcess = Container.get(TaskRunnerProcess); const runnerProcess = Container.get(TaskRunnerProcess);
const taskBroker = Container.get(TaskBroker); const taskBroker = Container.get(TaskBroker);
const taskRunnerService = Container.get(TaskRunnerWsServer); const taskRunnerService = Container.get(TaskBrokerWsServer);
beforeAll(async () => { beforeAll(async () => {
await taskRunnerServer.start(); await taskRunnerServer.start();

View file

@ -1,6 +1,6 @@
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
describe('TaskRunnerServer', () => { describe('TaskBrokerServer', () => {
const { agent, server } = setupBrokerTestServer({ const { agent, server } = setupBrokerTestServer({
authToken: 'token', authToken: 'token',
mode: 'external', mode: 'external',