fix(core): Ensure task runner module and server shut down (no-changelog) (#11801)

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Iván Ovejero 2024-11-20 10:55:19 +01:00 committed by GitHub
parent d15b8d0509
commit 2632b1fb7f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 71 additions and 27 deletions

View file

@ -17,12 +17,14 @@ describe('TaskRunnerWsServer', () => {
mock(),
);
server.start();
expect(setIntervalSpy).toHaveBeenCalledWith(
expect.any(Function),
30 * Time.seconds.toMilliseconds,
);
await server.shutdown();
await server.stop();
});
it('should clear heartbeat timer on server stop', async () => {
@ -36,8 +38,9 @@ describe('TaskRunnerWsServer', () => {
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
mock(),
);
server.start();
await server.shutdown();
await server.stop();
expect(clearIntervalSpy).toHaveBeenCalled();
});

View file

@ -21,6 +21,16 @@ function heartbeat(this: WebSocket) {
this.isAlive = true;
}
const enum WsStatusCode {
CloseNormal = 1000,
CloseGoingAway = 1001,
CloseProtocolError = 1002,
CloseUnsupportedData = 1003,
CloseNoStatus = 1005,
CloseAbnormal = 1006,
CloseInvalidData = 1007,
}
@Service()
export class TaskRunnerWsServer {
runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();
@ -33,7 +43,9 @@ export class TaskRunnerWsServer {
private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer,
private readonly taskTunnersConfig: TaskRunnersConfig,
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
) {
) {}
start() {
this.startHeartbeatChecks();
}
@ -47,7 +59,11 @@ export class TaskRunnerWsServer {
this.heartbeatTimer = setInterval(() => {
for (const [runnerId, connection] of this.runnerConnections.entries()) {
if (!connection.isAlive) {
void this.removeConnection(runnerId, 'failed-heartbeat-check');
void this.removeConnection(
runnerId,
'failed-heartbeat-check',
WsStatusCode.CloseNoStatus,
);
this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check');
return;
}
@ -57,17 +73,13 @@ export class TaskRunnerWsServer {
}, heartbeatInterval * Time.seconds.toMilliseconds);
}
async shutdown() {
async stop() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
}
await Promise.all(
Array.from(this.runnerConnections.keys()).map(
async (id) => await this.removeConnection(id, 'shutting-down'),
),
);
await this.stopConnectedRunners();
}
setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) {
@ -141,7 +153,11 @@ export class TaskRunnerWsServer {
);
}
async removeConnection(id: TaskRunner['id'], reason: DisconnectReason = 'unknown') {
async removeConnection(
id: TaskRunner['id'],
reason: DisconnectReason = 'unknown',
code?: WsStatusCode,
) {
const connection = this.runnerConnections.get(id);
if (connection) {
const disconnectError = await this.disconnectAnalyzer.toDisconnectError({
@ -150,7 +166,7 @@ export class TaskRunnerWsServer {
heartbeatInterval: this.taskTunnersConfig.heartbeatInterval,
});
this.taskBroker.deregisterRunner(id, disconnectError);
connection.close();
connection.close(code);
this.runnerConnections.delete(id);
}
}
@ -158,4 +174,14 @@ export class TaskRunnerWsServer {
handleRequest(req: TaskRunnerServerInitRequest, _res: TaskRunnerServerInitResponse) {
this.add(req.query.id, req.ws);
}
private async stopConnectedRunners() {
// TODO: We should give runners some time to finish their tasks before
// shutting them down
await Promise.all(
Array.from(this.runnerConnections.keys()).map(
async (id) => await this.removeConnection(id, 'shutting-down', WsStatusCode.CloseGoingAway),
),
);
}
}

View file

@ -2,6 +2,7 @@ import { TaskRunnersConfig } from '@n8n/config';
import * as a from 'node:assert/strict';
import Container, { Service } from 'typedi';
import { OnShutdown } from '@/decorators/on-shutdown';
import type { TaskRunnerProcess } from '@/runners/task-runner-process';
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
@ -41,16 +42,23 @@ export class TaskRunnerModule {
}
}
@OnShutdown()
async stop() {
const stopRunnerProcessTask = (async () => {
if (this.taskRunnerProcess) {
await this.taskRunnerProcess.stop();
this.taskRunnerProcess = undefined;
}
})();
const stopRunnerServerTask = (async () => {
if (this.taskRunnerHttpServer) {
await this.taskRunnerHttpServer.stop();
this.taskRunnerHttpServer = undefined;
}
})();
await Promise.all([stopRunnerProcessTask, stopRunnerServerTask]);
}
private async loadTaskManager() {

View file

@ -9,8 +9,7 @@ import { parse as parseUrl } from 'node:url';
import { Service } from 'typedi';
import { Server as WSServer } from 'ws';
import { inTest, LOWEST_SHUTDOWN_PRIORITY } from '@/constants';
import { OnShutdown } from '@/decorators/on-shutdown';
import { inTest } from '@/constants';
import { Logger } from '@/logging/logger.service';
import { bodyParser, rawBodyReader } from '@/middlewares';
import { send } from '@/response-helper';
@ -69,16 +68,22 @@ export class TaskRunnerServer {
this.configureRoutes();
}
@OnShutdown(LOWEST_SHUTDOWN_PRIORITY)
async stop(): Promise<void> {
if (this.wsServer) {
this.wsServer.close();
this.wsServer = undefined;
}
const stopHttpServerTask = (async () => {
if (this.server) {
await new Promise<void>((resolve) => this.server?.close(() => resolve()));
this.server = undefined;
}
})();
const stopWsServerTask = this.taskRunnerWsServer.stop();
await Promise.all([stopHttpServerTask, stopWsServerTask]);
}
/** Creates an HTTP server and listens to the configured port */
@ -119,6 +124,8 @@ export class TaskRunnerServer {
maxPayload: this.globalConfig.taskRunners.maxPayload,
});
this.server.on('upgrade', this.handleUpgradeRequest);
this.taskRunnerWsServer.start();
}
private async setupErrorHandlers() {