From 6a12f0c5fab9e41ab2107ffeff260a388e3b6fda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 4 Oct 2024 12:18:26 +0200 Subject: [PATCH] fix(core)!: Bind worker server to IPv4 (#11087) --- .../config/src/configs/scaling-mode.config.ts | 6 ++- packages/@n8n/config/test/config.test.ts | 1 + packages/cli/BREAKING-CHANGES.md | 10 ++++ packages/cli/src/errors/port-taken.error.ts | 9 ---- .../scaling/__tests__/worker-server.test.ts | 48 +++++++++++-------- packages/cli/src/scaling/worker-server.ts | 13 +++-- 6 files changed, 53 insertions(+), 34 deletions(-) delete mode 100644 packages/cli/src/errors/port-taken.error.ts diff --git a/packages/@n8n/config/src/configs/scaling-mode.config.ts b/packages/@n8n/config/src/configs/scaling-mode.config.ts index 750de77b07..a1f5b2a7d6 100644 --- a/packages/@n8n/config/src/configs/scaling-mode.config.ts +++ b/packages/@n8n/config/src/configs/scaling-mode.config.ts @@ -6,9 +6,13 @@ class HealthConfig { @Env('QUEUE_HEALTH_CHECK_ACTIVE') active: boolean = false; - /** Port for worker to respond to health checks requests on, if enabled. */ + /** Port for worker server to listen on. */ @Env('QUEUE_HEALTH_CHECK_PORT') port: number = 5678; + + /** IP address for worker server to listen on. */ + @Env('N8N_WORKER_SERVER_ADDRESS') + address: string = '0.0.0.0'; } @Config diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 5c5de4e44a..a0952d0dd0 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -198,6 +198,7 @@ describe('GlobalConfig', () => { health: { active: false, port: 5678, + address: '0.0.0.0', }, bull: { redis: { diff --git a/packages/cli/BREAKING-CHANGES.md b/packages/cli/BREAKING-CHANGES.md index 5cffb058e8..012892c6ae 100644 --- a/packages/cli/BREAKING-CHANGES.md +++ b/packages/cli/BREAKING-CHANGES.md @@ -2,6 +2,16 @@ This list shows all the versions which include breaking changes and how to upgrade. +# 1.63.0 + +### What changed? + +The worker server used to bind to IPv6 by default. It now binds to IPv4 by default. + +### When is action necessary? + +If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`. + ## 1.57.0 ### What changed? diff --git a/packages/cli/src/errors/port-taken.error.ts b/packages/cli/src/errors/port-taken.error.ts deleted file mode 100644 index 30c63a679f..0000000000 --- a/packages/cli/src/errors/port-taken.error.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { ApplicationError } from 'n8n-workflow'; - -export class PortTakenError extends ApplicationError { - constructor(port: number) { - super( - `Port ${port} is already in use. Do you already have the n8n main process running on that port?`, - ); - } -} diff --git a/packages/cli/src/scaling/__tests__/worker-server.test.ts b/packages/cli/src/scaling/__tests__/worker-server.test.ts index d5716bfac4..7f3a21a778 100644 --- a/packages/cli/src/scaling/__tests__/worker-server.test.ts +++ b/packages/cli/src/scaling/__tests__/worker-server.test.ts @@ -5,7 +5,6 @@ import type { InstanceSettings } from 'n8n-core'; import { AssertionError } from 'node:assert'; import * as http from 'node:http'; -import { PortTakenError } from '@/errors/port-taken.error'; import type { ExternalHooks } from '@/external-hooks'; import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { bodyParser, rawBodyReader } from '@/middlewares'; @@ -34,7 +33,7 @@ describe('WorkerServer', () => { beforeEach(() => { globalConfig = mock({ queue: { - health: { active: true, port: 5678 }, + health: { active: true, port: 5678, address: '0.0.0.0' }, }, credentials: { overwrite: { endpoint: '' }, @@ -59,8 +58,11 @@ describe('WorkerServer', () => { ).toThrowError(AssertionError); }); - it('should throw if port taken', async () => { + it('should exit if port taken', async () => { const server = mock(); + const procesExitSpy = jest + .spyOn(process, 'exit') + .mockImplementation(() => undefined as never); jest.spyOn(http, 'createServer').mockReturnValue(server); @@ -69,18 +71,19 @@ describe('WorkerServer', () => { return server; }); - expect( - () => - new WorkerServer( - globalConfig, - mock(), - mock(), - mock(), - externalHooks, - instanceSettings, - prometheusMetricsService, - ), - ).toThrowError(PortTakenError); + new WorkerServer( + globalConfig, + mock(), + mock(), + mock(), + externalHooks, + instanceSettings, + prometheusMetricsService, + ); + + expect(procesExitSpy).toHaveBeenCalledWith(1); + + procesExitSpy.mockRestore(); }); }); @@ -89,8 +92,9 @@ describe('WorkerServer', () => { const server = mock(); jest.spyOn(http, 'createServer').mockReturnValue(server); - server.listen.mockImplementation((_port, callback: () => void) => { - callback(); + server.listen.mockImplementation((...args: unknown[]) => { + const callback = args.find((arg) => typeof arg === 'function'); + if (callback) callback(); return server; }); @@ -123,8 +127,9 @@ describe('WorkerServer', () => { const server = mock(); jest.spyOn(http, 'createServer').mockReturnValue(server); - server.listen.mockImplementation((_port, callback: () => void) => { - callback(); + server.listen.mockImplementation((...args: unknown[]) => { + const callback = args.find((arg) => typeof arg === 'function'); + if (callback) callback(); return server; }); @@ -177,8 +182,9 @@ describe('WorkerServer', () => { prometheusMetricsService, ); - server.listen.mockImplementation((_port, callback: () => void) => { - callback(); + server.listen.mockImplementation((...args: unknown[]) => { + const callback = args.find((arg) => typeof arg === 'function'); + if (callback) callback(); return server; }); diff --git a/packages/cli/src/scaling/worker-server.ts b/packages/cli/src/scaling/worker-server.ts index 4d27a82a1c..3343ce4e49 100644 --- a/packages/cli/src/scaling/worker-server.ts +++ b/packages/cli/src/scaling/worker-server.ts @@ -12,7 +12,6 @@ import { CredentialsOverwrites } from '@/credentials-overwrites'; import * as Db from '@/db'; import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error'; import { NonJsonBodyError } from '@/errors/non-json-body.error'; -import { PortTakenError } from '@/errors/port-taken.error'; import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; import { ExternalHooks } from '@/external-hooks'; import type { ICredentialsOverwrite } from '@/interfaces'; @@ -40,6 +39,8 @@ export type WorkerServerEndpointsConfig = { export class WorkerServer { private readonly port: number; + private readonly address: string; + private readonly server: Server; private readonly app: Application; @@ -66,9 +67,15 @@ export class WorkerServer { this.server = http.createServer(this.app); this.port = this.globalConfig.queue.health.port; + this.address = this.globalConfig.queue.health.address; this.server.on('error', (error: NodeJS.ErrnoException) => { - if (error.code === 'EADDRINUSE') throw new PortTakenError(this.port); + if (error.code === 'EADDRINUSE') { + this.logger.error( + `Port ${this.port} is already in use, possibly by the n8n main process server. Please set a different port for the worker server.`, + ); + process.exit(1); + } }); } @@ -79,7 +86,7 @@ export class WorkerServer { await this.mountEndpoints(); - await new Promise((resolve) => this.server.listen(this.port, resolve)); + await new Promise((resolve) => this.server.listen(this.port, this.address, resolve)); await this.externalHooks.run('worker.ready');