diff --git a/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts b/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts index bf3bab4c27..808cc1cb93 100644 --- a/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts +++ b/packages/cli/src/task-runners/__tests__/task-runner-process-restart-loop-detector.test.ts @@ -2,15 +2,15 @@ import { TaskRunnersConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import type { Logger } from 'n8n-core'; -import type { TaskRunnerAuthService } from '@/task-runners/auth/task-runner-auth.service'; import { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; +import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service'; import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; describe('TaskRunnerProcessRestartLoopDetector', () => { const mockLogger = mock(); - const mockAuthService = mock(); + const mockAuthService = mock(); const runnerConfig = new TaskRunnersConfig(); const taskRunnerProcess = new TaskRunnerProcess( mockLogger, diff --git a/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts b/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts index 7bf475bf7c..0d08ecd1c2 100644 --- a/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/task-runners/__tests__/task-runner-process.test.ts @@ -3,7 +3,7 @@ import { mock } from 'jest-mock-extended'; import { Logger } from 'n8n-core'; import type { ChildProcess, SpawnOptions } from 'node:child_process'; -import type { TaskRunnerAuthService } from '@/task-runners/auth/task-runner-auth.service'; +import type { TaskBrokerAuthService } from '@/task-runners/task-broker/auth/task-broker-auth.service'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { mockInstance } from '@test/mocking'; @@ -26,7 +26,7 @@ describe('TaskRunnerProcess', () => { const runnerConfig = mockInstance(TaskRunnersConfig); runnerConfig.enabled = true; runnerConfig.mode = 'internal'; - const authService = mock(); + const authService = mock(); let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); afterEach(async () => { diff --git a/packages/cli/src/task-runners/auth/task-runner-auth.schema.ts b/packages/cli/src/task-runners/auth/task-runner-auth.schema.ts deleted file mode 100644 index c3ab2c17f2..0000000000 --- a/packages/cli/src/task-runners/auth/task-runner-auth.schema.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { z } from 'zod'; - -export const taskRunnerAuthRequestBodySchema = z.object({ - token: z.string().min(1), -}); diff --git a/packages/cli/src/task-runners/default-task-runner-disconnect-analyzer.ts b/packages/cli/src/task-runners/default-task-runner-disconnect-analyzer.ts index a0193c40e0..1743c531a4 100644 --- a/packages/cli/src/task-runners/default-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/task-runners/default-task-runner-disconnect-analyzer.ts @@ -1,10 +1,13 @@ import { Service } from '@n8n/di'; import config from '@/config'; +import type { + DisconnectAnalyzer, + DisconnectErrorOptions, +} from '@/task-runners/task-broker/task-broker-types'; import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error'; -import type { DisconnectAnalyzer, DisconnectErrorOptions } from './task-runner-types'; /** * Analyzes the disconnect reason of a task runner to provide a more diff --git a/packages/cli/src/task-runners/errors/task-runner-oom-error.ts b/packages/cli/src/task-runners/errors/task-runner-oom-error.ts index 5c78bef816..412cf6e626 100644 --- a/packages/cli/src/task-runners/errors/task-runner-oom-error.ts +++ b/packages/cli/src/task-runners/errors/task-runner-oom-error.ts @@ -1,6 +1,6 @@ import { ApplicationError } from 'n8n-workflow'; -import type { TaskRunner } from '../task-broker.service'; +import type { TaskRunner } from '@/task-runners/task-broker/task-broker.service'; export class TaskRunnerOomError extends ApplicationError { description: string; diff --git a/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts b/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts index a84682e521..386cab23fd 100644 --- a/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/task-runners/internal-task-runner-disconnect-analyzer.ts @@ -1,12 +1,13 @@ import { TaskRunnersConfig } from '@n8n/config'; import { Service } from '@n8n/di'; +import type { DisconnectErrorOptions } from '@/task-runners/task-broker/task-broker-types'; + import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { TaskRunnerOomError } from './errors/task-runner-oom-error'; import { SlidingWindowSignal } from './sliding-window-signal'; import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process'; import { TaskRunnerProcess } from './task-runner-process'; -import type { DisconnectErrorOptions } from './task-runner-types'; /** * Analyzes the disconnect reason of a task runner process to provide a more diff --git a/packages/cli/src/task-runners/__tests__/task-runner-server.test.ts b/packages/cli/src/task-runners/task-broker/__tests__/task-broker-server.test.ts similarity index 70% rename from packages/cli/src/task-runners/__tests__/task-runner-server.test.ts rename to packages/cli/src/task-runners/task-broker/__tests__/task-broker-server.test.ts index 33de18c605..a474bbecc0 100644 --- a/packages/cli/src/task-runners/__tests__/task-runner-server.test.ts +++ b/packages/cli/src/task-runners/task-broker/__tests__/task-broker-server.test.ts @@ -3,24 +3,23 @@ import { mock } from 'jest-mock-extended'; import { ServerResponse } from 'node:http'; import type WebSocket from 'ws'; -import type { TaskRunnerAuthController } from '@/task-runners/auth/task-runner-auth.controller'; -import { TaskRunnerServer } from '@/task-runners/task-runner-server'; +import type { TaskBrokerAuthController } from '@/task-runners/task-broker/auth/task-broker-auth.controller'; +import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server'; +import type { TaskBrokerServerInitRequest } from '@/task-runners/task-broker/task-broker-types'; -import type { TaskRunnerServerInitRequest } from '../task-runner-types'; - -describe('TaskRunnerServer', () => { +describe('TaskBrokerServer', () => { describe('handleUpgradeRequest', () => { it('should close WebSocket when response status code is > 200', () => { const ws = mock(); - const request = mock({ + const request = mock({ url: '/runners/_ws', ws, }); - const server = new TaskRunnerServer( + const server = new TaskBrokerServer( mock(), mock({ taskRunners: { path: '/runners' } }), - mock(), + mock(), mock(), ); @@ -39,15 +38,15 @@ describe('TaskRunnerServer', () => { it('should not close WebSocket when response status code is 200', () => { const ws = mock(); - const request = mock({ + const request = mock({ url: '/runners/_ws', ws, }); - const server = new TaskRunnerServer( + const server = new TaskBrokerServer( mock(), mock({ taskRunners: { path: '/runners' } }), - mock(), + mock(), mock(), ); diff --git a/packages/cli/src/task-runners/__tests__/task-runner-ws-server.test.ts b/packages/cli/src/task-runners/task-broker/__tests__/task-broker-ws-server.test.ts similarity index 86% rename from packages/cli/src/task-runners/__tests__/task-runner-ws-server.test.ts rename to packages/cli/src/task-runners/task-broker/__tests__/task-broker-ws-server.test.ts index cabedc530b..2a4dc85b42 100644 --- a/packages/cli/src/task-runners/__tests__/task-runner-ws-server.test.ts +++ b/packages/cli/src/task-runners/task-broker/__tests__/task-broker-ws-server.test.ts @@ -3,12 +3,12 @@ import { mock } from 'jest-mock-extended'; import type WebSocket from 'ws'; import { Time, WsStatusCodes } from '@/constants'; -import { TaskRunnerWsServer } from '@/task-runners/task-runner-ws-server'; +import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; -describe('TaskRunnerWsServer', () => { +describe('TaskBrokerWsServer', () => { describe('removeConnection', () => { it('should close with 1000 status code by default', async () => { - const server = new TaskRunnerWsServer(mock(), mock(), mock(), mock(), mock()); + const server = new TaskBrokerWsServer(mock(), mock(), mock(), mock(), mock()); const ws = mock(); server.runnerConnections.set('test-runner', ws); @@ -22,7 +22,7 @@ describe('TaskRunnerWsServer', () => { it('should set up heartbeat timer on server start', async () => { const setIntervalSpy = jest.spyOn(global, 'setInterval'); - const server = new TaskRunnerWsServer( + const server = new TaskBrokerWsServer( mock(), mock(), mock(), @@ -44,7 +44,7 @@ describe('TaskRunnerWsServer', () => { jest.spyOn(global, 'setInterval'); const clearIntervalSpy = jest.spyOn(global, 'clearInterval'); - const server = new TaskRunnerWsServer( + const server = new TaskBrokerWsServer( mock(), mock(), mock(), @@ -61,7 +61,7 @@ describe('TaskRunnerWsServer', () => { describe('sendMessage', () => { it('should work with a message containing circular references', () => { - const server = new TaskRunnerWsServer(mock(), mock(), mock(), mock(), mock()); + const server = new TaskBrokerWsServer(mock(), mock(), mock(), mock(), mock()); const ws = mock(); server.runnerConnections.set('test-runner', ws); diff --git a/packages/cli/src/task-runners/__tests__/task-broker.test.ts b/packages/cli/src/task-runners/task-broker/__tests__/task-broker.service.test.ts similarity index 99% rename from packages/cli/src/task-runners/__tests__/task-broker.test.ts rename to packages/cli/src/task-runners/task-broker/__tests__/task-broker.service.test.ts index ced7e1c07e..90261acdc9 100644 --- a/packages/cli/src/task-runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/task-runners/task-broker/__tests__/task-broker.service.test.ts @@ -4,12 +4,12 @@ import { mock } from 'jest-mock-extended'; import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow'; import { Time } from '@/constants'; +import type { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events'; -import { TaskRejectError } from '../errors'; +import { TaskRejectError } from '../errors/task-reject.error'; import { TaskRunnerTimeoutError } from '../errors/task-runner-timeout.error'; import { TaskBroker } from '../task-broker.service'; import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service'; -import type { TaskRunnerLifecycleEvents } from '../task-runner-lifecycle-events'; const createValidUntil = (ms: number) => process.hrtime.bigint() + BigInt(ms * 1_000_000); diff --git a/packages/cli/src/task-runners/auth/__tests__/task-runner-auth.controller.test.ts b/packages/cli/src/task-runners/task-broker/auth/__tests__/task-broker-auth.controller.test.ts similarity index 80% rename from packages/cli/src/task-runners/auth/__tests__/task-runner-auth.controller.test.ts rename to packages/cli/src/task-runners/task-broker/auth/__tests__/task-broker-auth.controller.test.ts index 3c650d1644..1ff547923c 100644 --- a/packages/cli/src/task-runners/auth/__tests__/task-runner-auth.controller.test.ts +++ b/packages/cli/src/task-runners/task-broker/auth/__tests__/task-broker-auth.controller.test.ts @@ -5,14 +5,14 @@ import { mock } from 'jest-mock-extended'; import { CacheService } from '@/services/cache/cache.service'; import { mockInstance } from '@test/mocking'; -import { BadRequestError } from '../../../errors/response-errors/bad-request.error'; -import { ForbiddenError } from '../../../errors/response-errors/forbidden.error'; -import type { AuthlessRequest } from '../../../requests'; -import type { TaskRunnerServerInitRequest } from '../../task-runner-types'; -import { TaskRunnerAuthController } from '../task-runner-auth.controller'; -import { TaskRunnerAuthService } from '../task-runner-auth.service'; +import { BadRequestError } from '../../../../errors/response-errors/bad-request.error'; +import { ForbiddenError } from '../../../../errors/response-errors/forbidden.error'; +import type { AuthlessRequest } from '../../../../requests'; +import type { TaskBrokerServerInitRequest } from '../../task-broker-types'; +import { TaskBrokerAuthController } from '../task-broker-auth.controller'; +import { TaskBrokerAuthService } from '../task-broker-auth.service'; -describe('TaskRunnerAuthController', () => { +describe('TaskBrokerAuthController', () => { const globalConfig = mockInstance(GlobalConfig, { cache: { backend: 'memory', @@ -27,8 +27,8 @@ describe('TaskRunnerAuthController', () => { }); const TTL = 100; const cacheService = new CacheService(globalConfig); - const authService = new TaskRunnerAuthService(globalConfig, cacheService, TTL); - const authController = new TaskRunnerAuthController(authService); + const authService = new TaskBrokerAuthService(globalConfig, cacheService, TTL); + const authController = new TaskBrokerAuthController(authService); const createMockGrantTokenReq = (token?: string) => ({ @@ -71,7 +71,7 @@ describe('TaskRunnerAuthController', () => { const next = jest.fn() as NextFunction; const createMockReqWithToken = (token?: string) => - mock({ + mock({ headers: { authorization: `Bearer ${token}`, }, @@ -82,7 +82,7 @@ describe('TaskRunnerAuthController', () => { }); it('should respond with 401 when grant token is missing', async () => { - const req = mock({}); + const req = mock({}); await authController.authMiddleware(req, res, next); diff --git a/packages/cli/src/task-runners/auth/__tests__/task-runner-auth.service.test.ts b/packages/cli/src/task-runners/task-broker/auth/__tests__/task-broker-auth.service.test.ts similarity index 91% rename from packages/cli/src/task-runners/auth/__tests__/task-runner-auth.service.test.ts rename to packages/cli/src/task-runners/task-broker/auth/__tests__/task-broker-auth.service.test.ts index 66581ffce0..4585f80b5b 100644 --- a/packages/cli/src/task-runners/auth/__tests__/task-runner-auth.service.test.ts +++ b/packages/cli/src/task-runners/task-broker/auth/__tests__/task-broker-auth.service.test.ts @@ -5,10 +5,10 @@ import config from '@/config'; import { CacheService } from '@/services/cache/cache.service'; import { retryUntil } from '@test-integration/retry-until'; -import { mockInstance } from '../../../../test/shared/mocking'; -import { TaskRunnerAuthService } from '../task-runner-auth.service'; +import { mockInstance } from '../../../../../test/shared/mocking'; +import { TaskBrokerAuthService } from '../task-broker-auth.service'; -describe('TaskRunnerAuthService', () => { +describe('TaskBrokerAuthService', () => { config.set('taskRunners.authToken', 'random-secret'); const globalConfig = mockInstance(GlobalConfig, { @@ -25,7 +25,7 @@ describe('TaskRunnerAuthService', () => { }); const TTL = 100; const cacheService = new CacheService(globalConfig); - const authService = new TaskRunnerAuthService(globalConfig, cacheService, TTL); + const authService = new TaskBrokerAuthService(globalConfig, cacheService, TTL); beforeEach(() => { jest.clearAllMocks(); diff --git a/packages/cli/src/task-runners/auth/task-runner-auth.controller.ts b/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.controller.ts similarity index 60% rename from packages/cli/src/task-runners/auth/task-runner-auth.controller.ts rename to packages/cli/src/task-runners/task-broker/auth/task-broker-auth.controller.ts index 4ea5f3b6f4..e9fd61c17b 100644 --- a/packages/cli/src/task-runners/auth/task-runner-auth.controller.ts +++ b/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.controller.ts @@ -2,19 +2,19 @@ import { Service } from '@n8n/di'; import type { NextFunction, Response } from 'express'; import type { AuthlessRequest } from '@/requests'; +import type { TaskBrokerServerInitRequest } from '@/task-runners/task-broker/task-broker-types'; -import { taskRunnerAuthRequestBodySchema } from './task-runner-auth.schema'; -import { TaskRunnerAuthService } from './task-runner-auth.service'; -import { BadRequestError } from '../../errors/response-errors/bad-request.error'; -import { ForbiddenError } from '../../errors/response-errors/forbidden.error'; -import type { TaskRunnerServerInitRequest } from '../task-runner-types'; +import { taskBrokerAuthRequestBodySchema } from './task-broker-auth.schema'; +import { TaskBrokerAuthService } from './task-broker-auth.service'; +import { BadRequestError } from '../../../errors/response-errors/bad-request.error'; +import { ForbiddenError } from '../../../errors/response-errors/forbidden.error'; /** * Controller responsible for authenticating Task Runner connections */ @Service() -export class TaskRunnerAuthController { - constructor(private readonly taskRunnerAuthService: TaskRunnerAuthService) { +export class TaskBrokerAuthController { + constructor(private readonly authService: TaskBrokerAuthService) { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment this.authMiddleware = this.authMiddleware.bind(this); } @@ -24,17 +24,17 @@ export class TaskRunnerAuthController { * which can be used to initiate a task runner connection. */ async createGrantToken(req: AuthlessRequest) { - const result = await taskRunnerAuthRequestBodySchema.safeParseAsync(req.body); + const result = await taskBrokerAuthRequestBodySchema.safeParseAsync(req.body); if (!result.success) { throw new BadRequestError(result.error.errors[0].code); } const { token: authToken } = result.data; - if (!this.taskRunnerAuthService.isValidAuthToken(authToken)) { + if (!this.authService.isValidAuthToken(authToken)) { throw new ForbiddenError(); } - const grantToken = await this.taskRunnerAuthService.createGrantToken(); + const grantToken = await this.authService.createGrantToken(); return { token: grantToken, }; @@ -43,7 +43,7 @@ export class TaskRunnerAuthController { /** * Middleware to authenticate task runner init requests */ - async authMiddleware(req: TaskRunnerServerInitRequest, res: Response, next: NextFunction) { + async authMiddleware(req: TaskBrokerServerInitRequest, res: Response, next: NextFunction) { const authHeader = req.headers.authorization; if (typeof authHeader !== 'string' || !authHeader.startsWith('Bearer ')) { res.status(401).json({ code: 401, message: 'Unauthorized' }); @@ -51,7 +51,7 @@ export class TaskRunnerAuthController { } const grantToken = authHeader.slice('Bearer '.length); - const isConsumed = await this.taskRunnerAuthService.tryConsumeGrantToken(grantToken); + const isConsumed = await this.authService.tryConsumeGrantToken(grantToken); if (!isConsumed) { res.status(403).json({ code: 403, message: 'Forbidden' }); return; diff --git a/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.schema.ts b/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.schema.ts new file mode 100644 index 0000000000..89c1acefb8 --- /dev/null +++ b/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.schema.ts @@ -0,0 +1,5 @@ +import { z } from 'zod'; + +export const taskBrokerAuthRequestBodySchema = z.object({ + token: z.string().min(1), +}); diff --git a/packages/cli/src/task-runners/auth/task-runner-auth.service.ts b/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.service.ts similarity index 97% rename from packages/cli/src/task-runners/auth/task-runner-auth.service.ts rename to packages/cli/src/task-runners/task-broker/auth/task-broker-auth.service.ts index 770a5ae1fa..1c401afed5 100644 --- a/packages/cli/src/task-runners/auth/task-runner-auth.service.ts +++ b/packages/cli/src/task-runners/task-broker/auth/task-broker-auth.service.ts @@ -8,7 +8,7 @@ import { CacheService } from '@/services/cache/cache.service'; const GRANT_TOKEN_TTL = 15 * Time.seconds.toMilliseconds; @Service() -export class TaskRunnerAuthService { +export class TaskBrokerAuthService { private readonly authToken = Buffer.from(this.globalConfig.taskRunners.authToken); constructor( diff --git a/packages/cli/src/task-runners/task-broker/errors/task-deferred.error.ts b/packages/cli/src/task-runners/task-broker/errors/task-deferred.error.ts new file mode 100644 index 0000000000..34f365af5f --- /dev/null +++ b/packages/cli/src/task-runners/task-broker/errors/task-deferred.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskDeferredError extends ApplicationError { + constructor() { + super('Task deferred until runner is ready', { level: 'info' }); + } +} diff --git a/packages/cli/src/task-runners/errors.ts b/packages/cli/src/task-runners/task-broker/errors/task-reject.error.ts similarity index 52% rename from packages/cli/src/task-runners/errors.ts rename to packages/cli/src/task-runners/task-broker/errors/task-reject.error.ts index c530e5a95d..4e3f903dd8 100644 --- a/packages/cli/src/task-runners/errors.ts +++ b/packages/cli/src/task-runners/task-broker/errors/task-reject.error.ts @@ -5,11 +5,3 @@ export class TaskRejectError extends ApplicationError { super(`Task rejected with reason: ${reason}`, { level: 'info' }); } } - -export class TaskDeferredError extends ApplicationError { - constructor() { - super('Task deferred until runner is ready', { level: 'info' }); - } -} - -export class TaskError extends ApplicationError {} diff --git a/packages/cli/src/task-runners/errors/task-runner-timeout.error.ts b/packages/cli/src/task-runners/task-broker/errors/task-runner-timeout.error.ts similarity index 100% rename from packages/cli/src/task-runners/errors/task-runner-timeout.error.ts rename to packages/cli/src/task-runners/task-broker/errors/task-runner-timeout.error.ts diff --git a/packages/cli/src/task-runners/task-runner-server.ts b/packages/cli/src/task-runners/task-broker/task-broker-server.ts similarity index 86% rename from packages/cli/src/task-runners/task-runner-server.ts rename to packages/cli/src/task-runners/task-broker/task-broker-server.ts index cadfd7aadd..ef2f0a5b28 100644 --- a/packages/cli/src/task-runners/task-runner-server.ts +++ b/packages/cli/src/task-runners/task-broker/task-broker-server.ts @@ -14,18 +14,18 @@ import { Server as WSServer } from 'ws'; import { inTest } from '@/constants'; import { bodyParser, rawBodyReader } from '@/middlewares'; import { send } from '@/response-helper'; -import { TaskRunnerAuthController } from '@/task-runners/auth/task-runner-auth.controller'; +import { TaskBrokerAuthController } from '@/task-runners/task-broker/auth/task-broker-auth.controller'; import type { - TaskRunnerServerInitRequest, - TaskRunnerServerInitResponse, -} from '@/task-runners/task-runner-types'; -import { TaskRunnerWsServer } from '@/task-runners/task-runner-ws-server'; + TaskBrokerServerInitRequest, + TaskBrokerServerInitResponse, +} from '@/task-runners/task-broker/task-broker-types'; +import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; /** - * Task Runner HTTP & WS server + * Task Broker HTTP & WS server */ @Service() -export class TaskRunnerServer { +export class TaskBrokerServer { private server: Server | undefined; private wsServer: WSServer | undefined; @@ -43,8 +43,8 @@ export class TaskRunnerServer { constructor( private readonly logger: Logger, private readonly globalConfig: GlobalConfig, - private readonly taskRunnerAuthController: TaskRunnerAuthController, - private readonly taskRunnerWsServer: TaskRunnerWsServer, + private readonly authController: TaskBrokerAuthController, + private readonly taskBrokerWsServer: TaskBrokerWsServer, ) { this.app = express(); this.app.disable('x-powered-by'); @@ -82,7 +82,7 @@ export class TaskRunnerServer { } })(); - const stopWsServerTask = this.taskRunnerWsServer.stop(); + const stopWsServerTask = this.taskBrokerWsServer.stop(); await Promise.all([stopHttpServerTask, stopWsServerTask]); } @@ -126,7 +126,7 @@ export class TaskRunnerServer { }); this.server.on('upgrade', this.handleUpgradeRequest); - this.taskRunnerWsServer.start(); + this.taskBrokerWsServer.start(); } private async setupErrorHandlers() { @@ -159,23 +159,23 @@ export class TaskRunnerServer { this.upgradeEndpoint, createRateLimiter(), // eslint-disable-next-line @typescript-eslint/unbound-method - this.taskRunnerAuthController.authMiddleware, - (req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) => - this.taskRunnerWsServer.handleRequest(req, res), + this.authController.authMiddleware, + (req: TaskBrokerServerInitRequest, res: TaskBrokerServerInitResponse) => + this.taskBrokerWsServer.handleRequest(req, res), ); const authEndpoint = `${this.getEndpointBasePath()}/auth`; this.app.post( authEndpoint, createRateLimiter(), - send(async (req) => await this.taskRunnerAuthController.createGrantToken(req)), + send(async (req) => await this.authController.createGrantToken(req)), ); this.app.get('/healthz', (_, res) => res.send({ status: 'ok' })); } private handleUpgradeRequest = ( - request: TaskRunnerServerInitRequest, + request: TaskBrokerServerInitRequest, socket: Socket, head: Buffer, ) => { diff --git a/packages/cli/src/task-runners/task-runner-types.ts b/packages/cli/src/task-runners/task-broker/task-broker-types.ts similarity index 53% rename from packages/cli/src/task-runners/task-runner-types.ts rename to packages/cli/src/task-runners/task-broker/task-broker-types.ts index ca26ed41b8..71a55cc053 100644 --- a/packages/cli/src/task-runners/task-runner-types.ts +++ b/packages/cli/src/task-runners/task-broker/task-broker-types.ts @@ -1,9 +1,8 @@ +import type { TaskRunner } from '@n8n/task-runner'; import type { Response } from 'express'; -import type { INodeExecutionData } from 'n8n-workflow'; import type WebSocket from 'ws'; -import type { TaskRunner } from './task-broker.service'; -import type { AuthlessRequest } from '../requests'; +import type { AuthlessRequest } from '../../requests'; export interface DisconnectAnalyzer { isCloudDeployment: boolean; @@ -11,19 +10,12 @@ export interface DisconnectAnalyzer { toDisconnectError(opts: DisconnectErrorOptions): Promise; } -export type DataRequestType = 'input' | 'node' | 'all'; - -export interface TaskResultData { - result: INodeExecutionData[]; - customData?: Record; -} - -export interface TaskRunnerServerInitRequest +export interface TaskBrokerServerInitRequest extends AuthlessRequest<{}, {}, {}, { id: TaskRunner['id']; token?: string }> { ws: WebSocket; } -export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest }; +export type TaskBrokerServerInitResponse = Response & { req: TaskBrokerServerInitRequest }; export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown'; diff --git a/packages/cli/src/task-runners/task-runner-ws-server.ts b/packages/cli/src/task-runners/task-broker/task-broker-ws-server.ts similarity index 90% rename from packages/cli/src/task-runners/task-runner-ws-server.ts rename to packages/cli/src/task-runners/task-broker/task-broker-ws-server.ts index dbec7cef3e..7b65140717 100644 --- a/packages/cli/src/task-runners/task-runner-ws-server.ts +++ b/packages/cli/src/task-runners/task-broker/task-broker-ws-server.ts @@ -6,16 +6,16 @@ import { ApplicationError, jsonStringify } from 'n8n-workflow'; import type WebSocket from 'ws'; import { Time, WsStatusCodes } from '@/constants'; - -import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; -import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service'; -import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; +import { DefaultTaskRunnerDisconnectAnalyzer } from '@/task-runners/default-task-runner-disconnect-analyzer'; import type { DisconnectAnalyzer, DisconnectReason, - TaskRunnerServerInitRequest, - TaskRunnerServerInitResponse, -} from './task-runner-types'; + TaskBrokerServerInitRequest, + TaskBrokerServerInitResponse, +} from '@/task-runners/task-broker/task-broker-types'; +import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events'; + +import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -23,8 +23,12 @@ function heartbeat(this: WebSocket) { type WsStatusCode = (typeof WsStatusCodes)[keyof typeof WsStatusCodes]; +/** + * Responsible for handling WebSocket connections with task runners + * and monitoring the connection liveness + */ @Service() -export class TaskRunnerWsServer { +export class TaskBrokerWsServer { runnerConnections: Map = new Map(); private heartbeatTimer: NodeJS.Timer | undefined; @@ -164,7 +168,7 @@ export class TaskRunnerWsServer { } } - handleRequest(req: TaskRunnerServerInitRequest, _res: TaskRunnerServerInitResponse) { + handleRequest(req: TaskBrokerServerInitRequest, _res: TaskBrokerServerInitResponse) { this.add(req.query.id, req.ws); } diff --git a/packages/cli/src/task-runners/task-broker.service.ts b/packages/cli/src/task-runners/task-broker/task-broker.service.ts similarity index 97% rename from packages/cli/src/task-runners/task-broker.service.ts rename to packages/cli/src/task-runners/task-broker/task-broker.service.ts index 42e0d3cd25..3afd182914 100644 --- a/packages/cli/src/task-runners/task-broker.service.ts +++ b/packages/cli/src/task-runners/task-broker/task-broker.service.ts @@ -12,10 +12,10 @@ import { nanoid } from 'nanoid'; import config from '@/config'; import { Time } from '@/constants'; - -import { TaskDeferredError, TaskRejectError } from './errors'; -import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error'; -import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; +import { TaskDeferredError } from '@/task-runners/task-broker/errors/task-deferred.error'; +import { TaskRejectError } from '@/task-runners/task-broker/errors/task-reject.error'; +import { TaskRunnerTimeoutError } from '@/task-runners/task-broker/errors/task-runner-timeout.error'; +import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events'; export interface TaskRunner { id: string; diff --git a/packages/cli/src/task-runners/task-managers/local-task-requester.ts b/packages/cli/src/task-runners/task-managers/local-task-requester.ts index 1de959936b..4fc45ccd6c 100644 --- a/packages/cli/src/task-runners/task-managers/local-task-requester.ts +++ b/packages/cli/src/task-runners/task-managers/local-task-requester.ts @@ -2,10 +2,10 @@ import { Container, Service } from '@n8n/di'; import type { RequesterMessage } from '@n8n/task-runner'; import { NodeTypes } from '@/node-types'; +import type { RequesterMessageCallback } from '@/task-runners/task-broker/task-broker.service'; +import { TaskBroker } from '@/task-runners/task-broker/task-broker.service'; import { TaskRequester } from './task-requester'; -import type { RequesterMessageCallback } from '../task-broker.service'; -import { TaskBroker } from '../task-broker.service'; @Service() export class LocalTaskRequester extends TaskRequester { diff --git a/packages/cli/src/task-runners/task-runner-module.ts b/packages/cli/src/task-runners/task-runner-module.ts index bcc54f653f..0ba60123f9 100644 --- a/packages/cli/src/task-runners/task-runner-module.ts +++ b/packages/cli/src/task-runners/task-runner-module.ts @@ -6,13 +6,13 @@ import * as a from 'node:assert/strict'; import { OnShutdown } from '@/decorators/on-shutdown'; import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; +import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; import type { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; import { MissingAuthTokenError } from './errors/missing-auth-token.error'; +import type { TaskBrokerServer } from './task-broker/task-broker-server'; import type { LocalTaskRequester } from './task-managers/local-task-requester'; -import type { TaskRunnerServer } from './task-runner-server'; -import { TaskRunnerWsServer } from './task-runner-ws-server'; /** * Module responsible for loading and starting task runner. Task runner can be @@ -21,9 +21,9 @@ import { TaskRunnerWsServer } from './task-runner-ws-server'; */ @Service() export class TaskRunnerModule { - private taskRunnerHttpServer: TaskRunnerServer | undefined; + private taskBrokerHttpServer: TaskBrokerServer | undefined; - private taskRunnerWsServer: TaskRunnerWsServer | undefined; + private taskBrokerWsServer: TaskBrokerWsServer | undefined; private taskRequester: LocalTaskRequester | undefined; @@ -47,7 +47,7 @@ export class TaskRunnerModule { if (mode === 'external' && !authToken) throw new MissingAuthTokenError(); await this.loadTaskRequester(); - await this.loadTaskRunnerServer(); + await this.loadTaskBroker(); if (mode === 'internal') { await this.startInternalTaskRunner(); @@ -64,9 +64,9 @@ export class TaskRunnerModule { })(); const stopRunnerServerTask = (async () => { - if (this.taskRunnerHttpServer) { - await this.taskRunnerHttpServer.stop(); - this.taskRunnerHttpServer = undefined; + if (this.taskBrokerHttpServer) { + await this.taskBrokerHttpServer.stop(); + this.taskBrokerHttpServer = undefined; } })(); @@ -82,18 +82,18 @@ export class TaskRunnerModule { Container.set(TaskRequester, this.taskRequester); } - private async loadTaskRunnerServer() { + private async loadTaskBroker() { // These are imported dynamically because we need to set the task manager // instance before importing them - const { TaskRunnerServer } = await import('@/task-runners/task-runner-server'); - this.taskRunnerHttpServer = Container.get(TaskRunnerServer); - this.taskRunnerWsServer = Container.get(TaskRunnerWsServer); + const { TaskBrokerServer } = await import('@/task-runners/task-broker/task-broker-server'); + this.taskBrokerHttpServer = Container.get(TaskBrokerServer); + this.taskBrokerWsServer = Container.get(TaskBrokerWsServer); - await this.taskRunnerHttpServer.start(); + await this.taskBrokerHttpServer.start(); } private async startInternalTaskRunner() { - a.ok(this.taskRunnerWsServer, 'Task Runner WS Server not loaded'); + a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded'); const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process'); this.taskRunnerProcess = Container.get(TaskRunnerProcess); @@ -110,7 +110,7 @@ export class TaskRunnerModule { const { InternalTaskRunnerDisconnectAnalyzer } = await import( '@/task-runners/internal-task-runner-disconnect-analyzer' ); - this.taskRunnerWsServer.setDisconnectAnalyzer( + this.taskBrokerWsServer.setDisconnectAnalyzer( Container.get(InternalTaskRunnerDisconnectAnalyzer), ); } diff --git a/packages/cli/src/task-runners/task-runner-process.ts b/packages/cli/src/task-runners/task-runner-process.ts index c2e769e6ec..26cfefc469 100644 --- a/packages/cli/src/task-runners/task-runner-process.ts +++ b/packages/cli/src/task-runners/task-runner-process.ts @@ -7,9 +7,9 @@ import * as process from 'node:process'; import { OnShutdown } from '@/decorators/on-shutdown'; -import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; import { forwardToLogger } from './forward-to-logger'; import { NodeProcessOomDetector } from './node-process-oom-detector'; +import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service'; import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; import { TypedEmitter } from '../typed-emitter'; @@ -68,7 +68,7 @@ export class TaskRunnerProcess extends TypedEmitter { constructor( logger: Logger, private readonly runnerConfig: TaskRunnersConfig, - private readonly authService: TaskRunnerAuthService, + private readonly authService: TaskBrokerAuthService, private readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, ) { super(); diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 8b9cebe854..36d6322ec7 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -17,8 +17,8 @@ import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; import { OrchestrationService } from '@/services/orchestration.service'; +import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; -import { TaskRunnerServer } from '@/task-runners/task-runner-server'; import { Telemetry } from '@/telemetry'; import { setupTestCommand } from '@test-integration/utils/test-command'; @@ -36,7 +36,7 @@ const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const scalingService = mockInstance(ScalingService); const orchestrationService = mockInstance(OrchestrationService); -const taskRunnerServer = mockInstance(TaskRunnerServer); +const taskBrokerServer = mockInstance(TaskBrokerServer); const taskRunnerProcess = mockInstance(TaskRunnerProcess); mockInstance(Publisher); mockInstance(Subscriber); @@ -60,7 +60,7 @@ test('worker initializes all its components', async () => { expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(orchestrationService.init).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); - expect(taskRunnerServer.start).toHaveBeenCalledTimes(1); + expect(taskBrokerServer.start).toHaveBeenCalledTimes(1); expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1); expect(config.getEnv('executions.mode')).toBe('queue'); diff --git a/packages/cli/test/integration/shared/utils/task-broker-test-server.ts b/packages/cli/test/integration/shared/utils/task-broker-test-server.ts index 63a96678a2..a908d5ce5d 100644 --- a/packages/cli/test/integration/shared/utils/task-broker-test-server.ts +++ b/packages/cli/test/integration/shared/utils/task-broker-test-server.ts @@ -3,10 +3,10 @@ import { Container } from '@n8n/di'; import request from 'supertest'; import type TestAgent from 'supertest/lib/agent'; -import { TaskRunnerServer } from '@/task-runners/task-runner-server'; +import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server'; export interface TestTaskBrokerServer { - server: TaskRunnerServer; + server: TaskBrokerServer; agent: TestAgent; config: TaskRunnersConfig; } @@ -29,11 +29,11 @@ export const setupBrokerTestServer = ( runnerConfig.enabled = true; runnerConfig.port = 0; // Use any port - const taskRunnerServer = Container.get(TaskRunnerServer); - const agent = request.agent(taskRunnerServer.app); + const taskBrokerServer = Container.get(TaskBrokerServer); + const agent = request.agent(taskBrokerServer.app); return { - server: taskRunnerServer, + server: taskBrokerServer, agent, config: runnerConfig, }; diff --git a/packages/cli/test/integration/task-runners/task-broker-server.test.ts b/packages/cli/test/integration/task-runners/task-broker-server.test.ts new file mode 100644 index 0000000000..ab17d54820 --- /dev/null +++ b/packages/cli/test/integration/task-runners/task-broker-server.test.ts @@ -0,0 +1,44 @@ +import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; + +describe('TaskBrokerServer', () => { + const { agent, server } = setupBrokerTestServer({ + authToken: 'token', + mode: 'external', + }); + + beforeAll(async () => { + await server.start(); + }); + + afterAll(async () => { + await server.stop(); + }); + + describe('/healthz', () => { + it('should return 200', async () => { + await agent.get('/healthz').expect(200); + }); + }); + + describe('/runners/_ws', () => { + it('should return 429 when too many requests are made', async () => { + await agent.post('/runners/_ws').send({}).expect(401); + await agent.post('/runners/_ws').send({}).expect(401); + await agent.post('/runners/_ws').send({}).expect(401); + await agent.post('/runners/_ws').send({}).expect(401); + await agent.post('/runners/_ws').send({}).expect(401); + await agent.post('/runners/_ws').send({}).expect(429); + }); + }); + + describe('/runners/auth', () => { + it('should return 429 when too many requests are made', async () => { + await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403); + await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403); + await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403); + await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403); + await agent.post('/runners/auth').send({ token: 'invalid' }).expect(403); + await agent.post('/runners/auth').send({ token: 'invalid' }).expect(429); + }); + }); +}); diff --git a/packages/cli/test/integration/task-runners/task-runner-module.external.test.ts b/packages/cli/test/integration/task-runners/task-runner-module.external.test.ts index d111854db6..f60b53bdad 100644 --- a/packages/cli/test/integration/task-runners/task-runner-module.external.test.ts +++ b/packages/cli/test/integration/task-runners/task-runner-module.external.test.ts @@ -3,10 +3,10 @@ import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { MissingAuthTokenError } from '@/task-runners/errors/missing-auth-token.error'; +import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; import { TaskRunnerModule } from '@/task-runners/task-runner-module'; import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/task-runners/default-task-runner-disconnect-analyzer'; -import { TaskRunnerWsServer } from '../../../src/task-runners/task-runner-ws-server'; describe('TaskRunnerModule in external mode', () => { const runnerConfig = Container.get(TaskRunnersConfig); @@ -46,7 +46,7 @@ describe('TaskRunnerModule in external mode', () => { }); it('should use DefaultTaskRunnerDisconnectAnalyzer', () => { - const wsServer = Container.get(TaskRunnerWsServer); + const wsServer = Container.get(TaskBrokerWsServer); expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(DefaultTaskRunnerDisconnectAnalyzer); }); diff --git a/packages/cli/test/integration/task-runners/task-runner-module.internal.test.ts b/packages/cli/test/integration/task-runners/task-runner-module.internal.test.ts index 3c373e9614..15876f04ac 100644 --- a/packages/cli/test/integration/task-runners/task-runner-module.internal.test.ts +++ b/packages/cli/test/integration/task-runners/task-runner-module.internal.test.ts @@ -1,10 +1,10 @@ import { TaskRunnersConfig } from '@n8n/config'; import { Container } from '@n8n/di'; +import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; import { TaskRunnerModule } from '@/task-runners/task-runner-module'; import { InternalTaskRunnerDisconnectAnalyzer } from '../../../src/task-runners/internal-task-runner-disconnect-analyzer'; -import { TaskRunnerWsServer } from '../../../src/task-runners/task-runner-ws-server'; describe('TaskRunnerModule in internal mode', () => { const runnerConfig = Container.get(TaskRunnersConfig); @@ -33,7 +33,7 @@ describe('TaskRunnerModule in internal mode', () => { }); it('should use InternalTaskRunnerDisconnectAnalyzer', () => { - const wsServer = Container.get(TaskRunnerWsServer); + const wsServer = Container.get(TaskBrokerWsServer); expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(InternalTaskRunnerDisconnectAnalyzer); }); diff --git a/packages/cli/test/integration/task-runners/task-runner-process.test.ts b/packages/cli/test/integration/task-runners/task-runner-process.test.ts index bd6dae00b0..6e5100e24a 100644 --- a/packages/cli/test/integration/task-runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/task-runners/task-runner-process.test.ts @@ -1,9 +1,9 @@ import { Container } from '@n8n/di'; -import { TaskBroker } from '@/task-runners/task-broker.service'; +import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; +import { TaskBroker } from '@/task-runners/task-broker/task-broker.service'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; -import { TaskRunnerWsServer } from '@/task-runners/task-runner-ws-server'; import { retryUntil } from '@test-integration/retry-until'; import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; @@ -13,7 +13,7 @@ describe('TaskRunnerProcess', () => { }); const runnerProcess = Container.get(TaskRunnerProcess); const taskBroker = Container.get(TaskBroker); - const taskRunnerService = Container.get(TaskRunnerWsServer); + const taskRunnerService = Container.get(TaskBrokerWsServer); beforeAll(async () => { await taskRunnerServer.start(); diff --git a/packages/cli/test/integration/task-runners/task-runner-server.test.ts b/packages/cli/test/integration/task-runners/task-runner-server.test.ts index 6088af3525..ab17d54820 100644 --- a/packages/cli/test/integration/task-runners/task-runner-server.test.ts +++ b/packages/cli/test/integration/task-runners/task-runner-server.test.ts @@ -1,6 +1,6 @@ import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; -describe('TaskRunnerServer', () => { +describe('TaskBrokerServer', () => { const { agent, server } = setupBrokerTestServer({ authToken: 'token', mode: 'external',