fix(core): Correct invalid WS status code on removing connection (#11901)

This commit is contained in:
Iván Ovejero 2024-11-26 17:22:41 +01:00 committed by GitHub
parent 97269a3703
commit 1d80225d26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 30 additions and 14 deletions

View file

@ -164,3 +164,13 @@ export const LOWEST_SHUTDOWN_PRIORITY = 0;
export const DEFAULT_SHUTDOWN_PRIORITY = 100; export const DEFAULT_SHUTDOWN_PRIORITY = 100;
/** Highest priority, meaning shut down happens before all other groups */ /** Highest priority, meaning shut down happens before all other groups */
export const HIGHEST_SHUTDOWN_PRIORITY = 200; export const HIGHEST_SHUTDOWN_PRIORITY = 200;
export const WsStatusCodes = {
CloseNormal: 1000,
CloseGoingAway: 1001,
CloseProtocolError: 1002,
CloseUnsupportedData: 1003,
CloseNoStatus: 1005,
CloseAbnormal: 1006,
CloseInvalidData: 1007,
} as const;

View file

@ -1,10 +1,23 @@
import type { TaskRunnersConfig } from '@n8n/config'; import type { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type WebSocket from 'ws';
import { Time } from '@/constants'; import { Time, WsStatusCodes } from '@/constants';
import { TaskRunnerWsServer } from '@/runners/runner-ws-server'; import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
describe('TaskRunnerWsServer', () => { describe('TaskRunnerWsServer', () => {
describe('removeConnection', () => {
it('should close with 1000 status code by default', async () => {
const server = new TaskRunnerWsServer(mock(), mock(), mock(), mock(), mock());
const ws = mock<WebSocket>();
server.runnerConnections.set('test-runner', ws);
await server.removeConnection('test-runner');
expect(ws.close).toHaveBeenCalledWith(WsStatusCodes.CloseNormal);
});
});
describe('heartbeat timer', () => { describe('heartbeat timer', () => {
it('should set up heartbeat timer on server start', async () => { it('should set up heartbeat timer on server start', async () => {
const setIntervalSpy = jest.spyOn(global, 'setInterval'); const setIntervalSpy = jest.spyOn(global, 'setInterval');

View file

@ -4,7 +4,7 @@ import { ApplicationError } from 'n8n-workflow';
import { Service } from 'typedi'; import { Service } from 'typedi';
import type WebSocket from 'ws'; import type WebSocket from 'ws';
import { Time } from '@/constants'; import { Time, WsStatusCodes } from '@/constants';
import { Logger } from '@/logging/logger.service'; import { Logger } from '@/logging/logger.service';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
@ -21,15 +21,7 @@ function heartbeat(this: WebSocket) {
this.isAlive = true; this.isAlive = true;
} }
const enum WsStatusCode { type WsStatusCode = (typeof WsStatusCodes)[keyof typeof WsStatusCodes];
CloseNormal = 1000,
CloseGoingAway = 1001,
CloseProtocolError = 1002,
CloseUnsupportedData = 1003,
CloseNoStatus = 1005,
CloseAbnormal = 1006,
CloseInvalidData = 1007,
}
@Service() @Service()
export class TaskRunnerWsServer { export class TaskRunnerWsServer {
@ -62,7 +54,7 @@ export class TaskRunnerWsServer {
void this.removeConnection( void this.removeConnection(
runnerId, runnerId,
'failed-heartbeat-check', 'failed-heartbeat-check',
WsStatusCode.CloseNoStatus, WsStatusCodes.CloseNoStatus,
); );
this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check'); this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check');
return; return;
@ -156,7 +148,7 @@ export class TaskRunnerWsServer {
async removeConnection( async removeConnection(
id: TaskRunner['id'], id: TaskRunner['id'],
reason: DisconnectReason = 'unknown', reason: DisconnectReason = 'unknown',
code?: WsStatusCode, code: WsStatusCode = WsStatusCodes.CloseNormal,
) { ) {
const connection = this.runnerConnections.get(id); const connection = this.runnerConnections.get(id);
if (connection) { if (connection) {
@ -181,7 +173,8 @@ export class TaskRunnerWsServer {
// shutting them down // shutting them down
await Promise.all( await Promise.all(
Array.from(this.runnerConnections.keys()).map( Array.from(this.runnerConnections.keys()).map(
async (id) => await this.removeConnection(id, 'shutting-down', WsStatusCode.CloseGoingAway), async (id) =>
await this.removeConnection(id, 'shutting-down', WsStatusCodes.CloseGoingAway),
), ),
); );
} }