diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 70f52f8cb8..041b6a8741 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -22,8 +22,6 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus' import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; import { License } from '@/license'; -import { LocalTaskManager } from '@/runners/task-managers/local-task-manager'; -import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Server } from '@/server'; @@ -224,19 +222,9 @@ export class Start extends BaseCommand { const { taskRunners: taskRunnerConfig } = this.globalConfig; if (!taskRunnerConfig.disabled) { - Container.set(TaskManager, new LocalTaskManager()); - const { TaskRunnerServer } = await import('@/runners/task-runner-server'); - const taskRunnerServer = Container.get(TaskRunnerServer); - await taskRunnerServer.start(); - - if ( - taskRunnerConfig.mode === 'internal_childprocess' || - taskRunnerConfig.mode === 'internal_launcher' - ) { - const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); - const runnerProcess = Container.get(TaskRunnerProcess); - await runnerProcess.start(); - } + const { TaskRunnerModule } = await import('@/runners/task-runner-module'); + const taskRunnerModule = Container.get(TaskRunnerModule); + await taskRunnerModule.start(); } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 4ba505a9b8..730c6f6e80 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -8,8 +8,6 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { Logger } from '@/logging/logger.service'; -import { LocalTaskManager } from '@/runners/task-managers/local-task-manager'; -import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; @@ -116,19 +114,9 @@ export class Worker extends BaseCommand { const { taskRunners: taskRunnerConfig } = this.globalConfig; if (!taskRunnerConfig.disabled) { - Container.set(TaskManager, new LocalTaskManager()); - const { TaskRunnerServer } = await import('@/runners/task-runner-server'); - const taskRunnerServer = Container.get(TaskRunnerServer); - await taskRunnerServer.start(); - - if ( - taskRunnerConfig.mode === 'internal_childprocess' || - taskRunnerConfig.mode === 'internal_launcher' - ) { - const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); - const runnerProcess = Container.get(TaskRunnerProcess); - await runnerProcess.start(); - } + const { TaskRunnerModule } = await import('@/runners/task-runner-module'); + const taskRunnerModule = Container.get(TaskRunnerModule); + await taskRunnerModule.start(); } } diff --git a/packages/cli/src/runners/__tests__/task-runner-process.test.ts b/packages/cli/src/runners/__tests__/task-runner-process.test.ts index 738333bc25..eb04e3ab8e 100644 --- a/packages/cli/src/runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-process.test.ts @@ -32,10 +32,10 @@ describe('TaskRunnerProcess', () => { }); describe('constructor', () => { - it('should not throw if runner mode is external', () => { + it('should throw if runner mode is external', () => { runnerConfig.mode = 'external'; - expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).not.toThrow(); + expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow(); runnerConfig.mode = 'internal_childprocess'; }); diff --git a/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts new file mode 100644 index 0000000000..e101c65e28 --- /dev/null +++ b/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts @@ -0,0 +1,16 @@ +import { Service } from 'typedi'; + +import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; +import type { DisconnectAnalyzer } from './runner-types'; +import type { TaskRunner } from './task-broker.service'; + +/** + * Analyzes the disconnect reason of a task runner to provide a more + * meaningful error message to the user. + */ +@Service() +export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer { + async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { + return new TaskRunnerDisconnectedError(runnerId); + } +} diff --git a/packages/cli/src/runners/task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts similarity index 88% rename from packages/cli/src/runners/task-runner-disconnect-analyzer.ts rename to packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts index d75a1b9aad..e3b9520f77 100644 --- a/packages/cli/src/runners/task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts @@ -3,7 +3,7 @@ import { Service } from 'typedi'; import config from '@/config'; -import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; +import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { TaskRunnerOomError } from './errors/task-runner-oom-error'; import { SlidingWindowSignal } from './sliding-window-signal'; import type { TaskRunner } from './task-broker.service'; @@ -15,13 +15,19 @@ import { TaskRunnerProcess } from './task-runner-process'; * meaningful error message to the user. */ @Service() -export class TaskRunnerDisconnectAnalyzer { +export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisconnectAnalyzer { + private get isCloudDeployment() { + return config.get('deployment.type') === 'cloud'; + } + private readonly exitReasonSignal: SlidingWindowSignal; constructor( private readonly runnerConfig: TaskRunnersConfig, private readonly taskRunnerProcess: TaskRunnerProcess, ) { + super(); + // When the task runner process is running as a child process, there's // no determinate time when it exits compared to when the runner disconnects // (i.e. it's a race condition). Hence we use a sliding window to determine @@ -32,17 +38,13 @@ export class TaskRunnerDisconnectAnalyzer { }); } - private get isCloudDeployment() { - return config.get('deployment.type') === 'cloud'; - } - async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { const exitCode = await this.awaitExitSignal(); if (exitCode === 'oom') { return new TaskRunnerOomError(runnerId, this.isCloudDeployment); } - return new TaskRunnerDisconnectedError(runnerId); + return await super.determineDisconnectReason(runnerId); } private async awaitExitSignal(): Promise { diff --git a/packages/cli/src/runners/runner-types.ts b/packages/cli/src/runners/runner-types.ts index a030f3874e..c5d4eb81c3 100644 --- a/packages/cli/src/runners/runner-types.ts +++ b/packages/cli/src/runners/runner-types.ts @@ -17,6 +17,12 @@ export interface TaskDataRequestParams { env: boolean; } +export interface DisconnectAnalyzer { + determineDisconnectReason(runnerId: TaskRunner['id']): Promise; +} + +export type DataRequestType = 'input' | 'node' | 'all'; + export interface TaskResultData { result: INodeExecutionData[]; customData?: Record; diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index 59bb92ff76..5d4c4e9607 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -3,29 +3,38 @@ import type WebSocket from 'ws'; import { Logger } from '@/logging/logger.service'; +import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import type { RunnerMessage, N8nMessage, TaskRunnerServerInitRequest, TaskRunnerServerInitResponse, + DisconnectAnalyzer, } from './runner-types'; import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service'; -import { TaskRunnerDisconnectAnalyzer } from './task-runner-disconnect-analyzer'; function heartbeat(this: WebSocket) { this.isAlive = true; } @Service() -export class TaskRunnerService { +export class TaskRunnerWsServer { runnerConnections: Map = new Map(); constructor( private readonly logger: Logger, private readonly taskBroker: TaskBroker, - private readonly disconnectAnalyzer: TaskRunnerDisconnectAnalyzer, + private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer, ) {} + setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) { + this.disconnectAnalyzer = disconnectAnalyzer; + } + + getDisconnectAnalyzer() { + return this.disconnectAnalyzer; + } + sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) { this.runnerConnections.get(id)?.send(JSON.stringify(message)); } diff --git a/packages/cli/src/runners/task-runner-module.ts b/packages/cli/src/runners/task-runner-module.ts new file mode 100644 index 0000000000..13521c599b --- /dev/null +++ b/packages/cli/src/runners/task-runner-module.ts @@ -0,0 +1,85 @@ +import { TaskRunnersConfig } from '@n8n/config'; +import * as a from 'node:assert/strict'; +import Container, { Service } from 'typedi'; + +import type { TaskRunnerProcess } from '@/runners/task-runner-process'; + +import { TaskRunnerWsServer } from './runner-ws-server'; +import type { LocalTaskManager } from './task-managers/local-task-manager'; +import type { TaskRunnerServer } from './task-runner-server'; + +/** + * Module responsible for loading and starting task runner. Task runner can be + * run either internally (=launched by n8n as a child process) or externally + * (=launched by some other orchestrator) + */ +@Service() +export class TaskRunnerModule { + private taskRunnerHttpServer: TaskRunnerServer | undefined; + + private taskRunnerWsServer: TaskRunnerWsServer | undefined; + + private taskManager: LocalTaskManager | undefined; + + private taskRunnerProcess: TaskRunnerProcess | undefined; + + constructor(private readonly runnerConfig: TaskRunnersConfig) {} + + async start() { + a.ok(!this.runnerConfig.disabled, 'Task runner is disabled'); + + await this.loadTaskManager(); + await this.loadTaskRunnerServer(); + + if ( + this.runnerConfig.mode === 'internal_childprocess' || + this.runnerConfig.mode === 'internal_launcher' + ) { + await this.startInternalTaskRunner(); + } + } + + async stop() { + if (this.taskRunnerProcess) { + await this.taskRunnerProcess.stop(); + this.taskRunnerProcess = undefined; + } + + if (this.taskRunnerHttpServer) { + await this.taskRunnerHttpServer.stop(); + this.taskRunnerHttpServer = undefined; + } + } + + private async loadTaskManager() { + const { TaskManager } = await import('@/runners/task-managers/task-manager'); + const { LocalTaskManager } = await import('@/runners/task-managers/local-task-manager'); + this.taskManager = new LocalTaskManager(); + Container.set(TaskManager, this.taskManager); + } + + private async loadTaskRunnerServer() { + // These are imported dynamically because we need to set the task manager + // instance before importing them + const { TaskRunnerServer } = await import('@/runners/task-runner-server'); + this.taskRunnerHttpServer = Container.get(TaskRunnerServer); + this.taskRunnerWsServer = Container.get(TaskRunnerWsServer); + + await this.taskRunnerHttpServer.start(); + } + + private async startInternalTaskRunner() { + a.ok(this.taskRunnerWsServer, 'Task Runner WS Server not loaded'); + + const { TaskRunnerProcess } = await import('@/runners/task-runner-process'); + this.taskRunnerProcess = Container.get(TaskRunnerProcess); + await this.taskRunnerProcess.start(); + + const { InternalTaskRunnerDisconnectAnalyzer } = await import( + '@/runners/internal-task-runner-disconnect-analyzer' + ); + this.taskRunnerWsServer.setDisconnectAnalyzer( + Container.get(InternalTaskRunnerDisconnectAnalyzer), + ); + } +} diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index 917ce2b75a..2eaa62621c 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -68,14 +68,15 @@ export class TaskRunnerProcess extends TypedEmitter { ) { super(); + a.ok( + this.runnerConfig.mode !== 'external', + 'Task Runner Process cannot be used in external mode', + ); + this.logger = logger.scoped('task-runner'); } async start() { - a.ok( - this.runnerConfig.mode === 'internal_childprocess' || - this.runnerConfig.mode === 'internal_launcher', - ); a.ok(!this.process, 'Task Runner Process already running'); const grantToken = await this.authService.createGrantToken(); diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts index 2199e70b38..6dd0fd5919 100644 --- a/packages/cli/src/runners/task-runner-server.ts +++ b/packages/cli/src/runners/task-runner-server.ts @@ -19,7 +19,7 @@ import type { TaskRunnerServerInitRequest, TaskRunnerServerInitResponse, } from '@/runners/runner-types'; -import { TaskRunnerService } from '@/runners/runner-ws-server'; +import { TaskRunnerWsServer } from '@/runners/runner-ws-server'; /** * Task Runner HTTP & WS server @@ -44,7 +44,7 @@ export class TaskRunnerServer { private readonly logger: Logger, private readonly globalConfig: GlobalConfig, private readonly taskRunnerAuthController: TaskRunnerAuthController, - private readonly taskRunnerService: TaskRunnerService, + private readonly taskRunnerService: TaskRunnerWsServer, ) { this.app = express(); this.app.disable('x-powered-by'); diff --git a/packages/cli/test/integration/runners/task-runner-module.external.test.ts b/packages/cli/test/integration/runners/task-runner-module.external.test.ts new file mode 100644 index 0000000000..e8a7e54f1a --- /dev/null +++ b/packages/cli/test/integration/runners/task-runner-module.external.test.ts @@ -0,0 +1,40 @@ +import { TaskRunnersConfig } from '@n8n/config'; +import Container from 'typedi'; + +import { TaskRunnerModule } from '@/runners/task-runner-module'; + +import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/runners/default-task-runner-disconnect-analyzer'; +import { TaskRunnerWsServer } from '../../../src/runners/runner-ws-server'; + +describe('TaskRunnerModule in external mode', () => { + const runnerConfig = Container.get(TaskRunnersConfig); + runnerConfig.mode = 'external'; + runnerConfig.port = 0; + const module = Container.get(TaskRunnerModule); + + afterEach(async () => { + await module.stop(); + }); + + describe('start', () => { + it('should throw if the task runner is disabled', async () => { + runnerConfig.disabled = true; + + // Act + await expect(module.start()).rejects.toThrow('Task runner is disabled'); + }); + + it('should start the task runner', async () => { + runnerConfig.disabled = false; + + // Act + await module.start(); + }); + + it('should use DefaultTaskRunnerDisconnectAnalyzer', () => { + const wsServer = Container.get(TaskRunnerWsServer); + + expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(DefaultTaskRunnerDisconnectAnalyzer); + }); + }); +}); diff --git a/packages/cli/test/integration/runners/task-runner-module.internal.test.ts b/packages/cli/test/integration/runners/task-runner-module.internal.test.ts new file mode 100644 index 0000000000..f53adde5e7 --- /dev/null +++ b/packages/cli/test/integration/runners/task-runner-module.internal.test.ts @@ -0,0 +1,39 @@ +import { TaskRunnersConfig } from '@n8n/config'; +import Container from 'typedi'; + +import { TaskRunnerModule } from '@/runners/task-runner-module'; + +import { InternalTaskRunnerDisconnectAnalyzer } from '../../../src/runners/internal-task-runner-disconnect-analyzer'; +import { TaskRunnerWsServer } from '../../../src/runners/runner-ws-server'; + +describe('TaskRunnerModule in internal_childprocess mode', () => { + const runnerConfig = Container.get(TaskRunnersConfig); + runnerConfig.mode = 'internal_childprocess'; + const module = Container.get(TaskRunnerModule); + + afterEach(async () => { + await module.stop(); + }); + + describe('start', () => { + it('should throw if the task runner is disabled', async () => { + runnerConfig.disabled = true; + + // Act + await expect(module.start()).rejects.toThrow('Task runner is disabled'); + }); + + it('should start the task runner', async () => { + runnerConfig.disabled = false; + + // Act + await module.start(); + }); + + it('should use InternalTaskRunnerDisconnectAnalyzer', () => { + const wsServer = Container.get(TaskRunnerWsServer); + + expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(InternalTaskRunnerDisconnectAnalyzer); + }); + }); +}); diff --git a/packages/cli/test/integration/runners/task-runner-process.test.ts b/packages/cli/test/integration/runners/task-runner-process.test.ts index c893add440..219fbc8813 100644 --- a/packages/cli/test/integration/runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/runners/task-runner-process.test.ts @@ -1,7 +1,7 @@ import { TaskRunnersConfig } from '@n8n/config'; import Container from 'typedi'; -import { TaskRunnerService } from '@/runners/runner-ws-server'; +import { TaskRunnerWsServer } from '@/runners/runner-ws-server'; import { TaskBroker } from '@/runners/task-broker.service'; import { TaskRunnerProcess } from '@/runners/task-runner-process'; import { TaskRunnerServer } from '@/runners/task-runner-server'; @@ -18,7 +18,7 @@ describe('TaskRunnerProcess', () => { const runnerProcess = Container.get(TaskRunnerProcess); const taskBroker = Container.get(TaskBroker); - const taskRunnerService = Container.get(TaskRunnerService); + const taskRunnerService = Container.get(TaskRunnerWsServer); const startLauncherSpy = jest.spyOn(runnerProcess, 'startLauncher'); const startNodeSpy = jest.spyOn(runnerProcess, 'startNode');