diff --git a/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts b/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts index 223cdbdc54..003e2c5f4e 100644 --- a/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts @@ -17,12 +17,14 @@ describe('TaskRunnerWsServer', () => { mock(), ); + server.start(); + expect(setIntervalSpy).toHaveBeenCalledWith( expect.any(Function), 30 * Time.seconds.toMilliseconds, ); - await server.shutdown(); + await server.stop(); }); it('should clear heartbeat timer on server stop', async () => { @@ -36,8 +38,9 @@ describe('TaskRunnerWsServer', () => { mock({ path: '/runners', heartbeatInterval: 30 }), mock(), ); + server.start(); - await server.shutdown(); + await server.stop(); expect(clearIntervalSpy).toHaveBeenCalled(); }); diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index 27a0d779e7..195490589d 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -21,6 +21,16 @@ function heartbeat(this: WebSocket) { this.isAlive = true; } +const enum WsStatusCode { + CloseNormal = 1000, + CloseGoingAway = 1001, + CloseProtocolError = 1002, + CloseUnsupportedData = 1003, + CloseNoStatus = 1005, + CloseAbnormal = 1006, + CloseInvalidData = 1007, +} + @Service() export class TaskRunnerWsServer { runnerConnections: Map = new Map(); @@ -33,7 +43,9 @@ export class TaskRunnerWsServer { private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer, private readonly taskTunnersConfig: TaskRunnersConfig, private readonly runnerLifecycleEvents: RunnerLifecycleEvents, - ) { + ) {} + + start() { this.startHeartbeatChecks(); } @@ -47,7 +59,11 @@ export class TaskRunnerWsServer { this.heartbeatTimer = setInterval(() => { for (const [runnerId, connection] of this.runnerConnections.entries()) { if (!connection.isAlive) { - void this.removeConnection(runnerId, 'failed-heartbeat-check'); + void this.removeConnection( + runnerId, + 'failed-heartbeat-check', + WsStatusCode.CloseNoStatus, + ); this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check'); return; } @@ -57,17 +73,13 @@ export class TaskRunnerWsServer { }, heartbeatInterval * Time.seconds.toMilliseconds); } - async shutdown() { + async stop() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = undefined; } - await Promise.all( - Array.from(this.runnerConnections.keys()).map( - async (id) => await this.removeConnection(id, 'shutting-down'), - ), - ); + await this.stopConnectedRunners(); } setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) { @@ -141,7 +153,11 @@ export class TaskRunnerWsServer { ); } - async removeConnection(id: TaskRunner['id'], reason: DisconnectReason = 'unknown') { + async removeConnection( + id: TaskRunner['id'], + reason: DisconnectReason = 'unknown', + code?: WsStatusCode, + ) { const connection = this.runnerConnections.get(id); if (connection) { const disconnectError = await this.disconnectAnalyzer.toDisconnectError({ @@ -150,7 +166,7 @@ export class TaskRunnerWsServer { heartbeatInterval: this.taskTunnersConfig.heartbeatInterval, }); this.taskBroker.deregisterRunner(id, disconnectError); - connection.close(); + connection.close(code); this.runnerConnections.delete(id); } } @@ -158,4 +174,14 @@ export class TaskRunnerWsServer { handleRequest(req: TaskRunnerServerInitRequest, _res: TaskRunnerServerInitResponse) { this.add(req.query.id, req.ws); } + + private async stopConnectedRunners() { + // TODO: We should give runners some time to finish their tasks before + // shutting them down + await Promise.all( + Array.from(this.runnerConnections.keys()).map( + async (id) => await this.removeConnection(id, 'shutting-down', WsStatusCode.CloseGoingAway), + ), + ); + } } diff --git a/packages/cli/src/runners/task-runner-module.ts b/packages/cli/src/runners/task-runner-module.ts index 123dc723e6..1502dd1f07 100644 --- a/packages/cli/src/runners/task-runner-module.ts +++ b/packages/cli/src/runners/task-runner-module.ts @@ -2,6 +2,7 @@ import { TaskRunnersConfig } from '@n8n/config'; import * as a from 'node:assert/strict'; import Container, { Service } from 'typedi'; +import { OnShutdown } from '@/decorators/on-shutdown'; import type { TaskRunnerProcess } from '@/runners/task-runner-process'; import { MissingAuthTokenError } from './errors/missing-auth-token.error'; @@ -41,16 +42,23 @@ export class TaskRunnerModule { } } + @OnShutdown() async stop() { - if (this.taskRunnerProcess) { - await this.taskRunnerProcess.stop(); - this.taskRunnerProcess = undefined; - } + const stopRunnerProcessTask = (async () => { + if (this.taskRunnerProcess) { + await this.taskRunnerProcess.stop(); + this.taskRunnerProcess = undefined; + } + })(); - if (this.taskRunnerHttpServer) { - await this.taskRunnerHttpServer.stop(); - this.taskRunnerHttpServer = undefined; - } + const stopRunnerServerTask = (async () => { + if (this.taskRunnerHttpServer) { + await this.taskRunnerHttpServer.stop(); + this.taskRunnerHttpServer = undefined; + } + })(); + + await Promise.all([stopRunnerProcessTask, stopRunnerServerTask]); } private async loadTaskManager() { diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts index eb428b52fa..a3b13fb8c4 100644 --- a/packages/cli/src/runners/task-runner-server.ts +++ b/packages/cli/src/runners/task-runner-server.ts @@ -9,8 +9,7 @@ import { parse as parseUrl } from 'node:url'; import { Service } from 'typedi'; import { Server as WSServer } from 'ws'; -import { inTest, LOWEST_SHUTDOWN_PRIORITY } from '@/constants'; -import { OnShutdown } from '@/decorators/on-shutdown'; +import { inTest } from '@/constants'; import { Logger } from '@/logging/logger.service'; import { bodyParser, rawBodyReader } from '@/middlewares'; import { send } from '@/response-helper'; @@ -69,16 +68,22 @@ export class TaskRunnerServer { this.configureRoutes(); } - @OnShutdown(LOWEST_SHUTDOWN_PRIORITY) async stop(): Promise { if (this.wsServer) { this.wsServer.close(); this.wsServer = undefined; } - if (this.server) { - await new Promise((resolve) => this.server?.close(() => resolve())); - this.server = undefined; - } + + const stopHttpServerTask = (async () => { + if (this.server) { + await new Promise((resolve) => this.server?.close(() => resolve())); + this.server = undefined; + } + })(); + + const stopWsServerTask = this.taskRunnerWsServer.stop(); + + await Promise.all([stopHttpServerTask, stopWsServerTask]); } /** Creates an HTTP server and listens to the configured port */ @@ -119,6 +124,8 @@ export class TaskRunnerServer { maxPayload: this.globalConfig.taskRunners.maxPayload, }); this.server.on('upgrade', this.handleUpgradeRequest); + + this.taskRunnerWsServer.start(); } private async setupErrorHandlers() {