fix(core)!: Bind worker server to IPv4 (#11087)

This commit is contained in:
Iván Ovejero 2024-10-04 12:18:26 +02:00 committed by GitHub
parent d974b015d0
commit 6a12f0c5fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 53 additions and 34 deletions

View file

@ -6,9 +6,13 @@ class HealthConfig {
@Env('QUEUE_HEALTH_CHECK_ACTIVE') @Env('QUEUE_HEALTH_CHECK_ACTIVE')
active: boolean = false; active: boolean = false;
/** Port for worker to respond to health checks requests on, if enabled. */ /** Port for worker server to listen on. */
@Env('QUEUE_HEALTH_CHECK_PORT') @Env('QUEUE_HEALTH_CHECK_PORT')
port: number = 5678; port: number = 5678;
/** IP address for worker server to listen on. */
@Env('N8N_WORKER_SERVER_ADDRESS')
address: string = '0.0.0.0';
} }
@Config @Config

View file

@ -198,6 +198,7 @@ describe('GlobalConfig', () => {
health: { health: {
active: false, active: false,
port: 5678, port: 5678,
address: '0.0.0.0',
}, },
bull: { bull: {
redis: { redis: {

View file

@ -2,6 +2,16 @@
This list shows all the versions which include breaking changes and how to upgrade. This list shows all the versions which include breaking changes and how to upgrade.
# 1.63.0
### What changed?
The worker server used to bind to IPv6 by default. It now binds to IPv4 by default.
### When is action necessary?
If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`.
## 1.57.0 ## 1.57.0
### What changed? ### What changed?

View file

@ -1,9 +0,0 @@
import { ApplicationError } from 'n8n-workflow';
export class PortTakenError extends ApplicationError {
constructor(port: number) {
super(
`Port ${port} is already in use. Do you already have the n8n main process running on that port?`,
);
}
}

View file

@ -5,7 +5,6 @@ import type { InstanceSettings } from 'n8n-core';
import { AssertionError } from 'node:assert'; import { AssertionError } from 'node:assert';
import * as http from 'node:http'; import * as http from 'node:http';
import { PortTakenError } from '@/errors/port-taken.error';
import type { ExternalHooks } from '@/external-hooks'; import type { ExternalHooks } from '@/external-hooks';
import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
import { bodyParser, rawBodyReader } from '@/middlewares'; import { bodyParser, rawBodyReader } from '@/middlewares';
@ -34,7 +33,7 @@ describe('WorkerServer', () => {
beforeEach(() => { beforeEach(() => {
globalConfig = mock<GlobalConfig>({ globalConfig = mock<GlobalConfig>({
queue: { queue: {
health: { active: true, port: 5678 }, health: { active: true, port: 5678, address: '0.0.0.0' },
}, },
credentials: { credentials: {
overwrite: { endpoint: '' }, overwrite: { endpoint: '' },
@ -59,8 +58,11 @@ describe('WorkerServer', () => {
).toThrowError(AssertionError); ).toThrowError(AssertionError);
}); });
it('should throw if port taken', async () => { it('should exit if port taken', async () => {
const server = mock<http.Server>(); const server = mock<http.Server>();
const procesExitSpy = jest
.spyOn(process, 'exit')
.mockImplementation(() => undefined as never);
jest.spyOn(http, 'createServer').mockReturnValue(server); jest.spyOn(http, 'createServer').mockReturnValue(server);
@ -69,18 +71,19 @@ describe('WorkerServer', () => {
return server; return server;
}); });
expect( new WorkerServer(
() => globalConfig,
new WorkerServer( mock(),
globalConfig, mock(),
mock(), mock(),
mock(), externalHooks,
mock(), instanceSettings,
externalHooks, prometheusMetricsService,
instanceSettings, );
prometheusMetricsService,
), expect(procesExitSpy).toHaveBeenCalledWith(1);
).toThrowError(PortTakenError);
procesExitSpy.mockRestore();
}); });
}); });
@ -89,8 +92,9 @@ describe('WorkerServer', () => {
const server = mock<http.Server>(); const server = mock<http.Server>();
jest.spyOn(http, 'createServer').mockReturnValue(server); jest.spyOn(http, 'createServer').mockReturnValue(server);
server.listen.mockImplementation((_port, callback: () => void) => { server.listen.mockImplementation((...args: unknown[]) => {
callback(); const callback = args.find((arg) => typeof arg === 'function');
if (callback) callback();
return server; return server;
}); });
@ -123,8 +127,9 @@ describe('WorkerServer', () => {
const server = mock<http.Server>(); const server = mock<http.Server>();
jest.spyOn(http, 'createServer').mockReturnValue(server); jest.spyOn(http, 'createServer').mockReturnValue(server);
server.listen.mockImplementation((_port, callback: () => void) => { server.listen.mockImplementation((...args: unknown[]) => {
callback(); const callback = args.find((arg) => typeof arg === 'function');
if (callback) callback();
return server; return server;
}); });
@ -177,8 +182,9 @@ describe('WorkerServer', () => {
prometheusMetricsService, prometheusMetricsService,
); );
server.listen.mockImplementation((_port, callback: () => void) => { server.listen.mockImplementation((...args: unknown[]) => {
callback(); const callback = args.find((arg) => typeof arg === 'function');
if (callback) callback();
return server; return server;
}); });

View file

@ -12,7 +12,6 @@ import { CredentialsOverwrites } from '@/credentials-overwrites';
import * as Db from '@/db'; import * as Db from '@/db';
import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error'; import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error';
import { NonJsonBodyError } from '@/errors/non-json-body.error'; import { NonJsonBodyError } from '@/errors/non-json-body.error';
import { PortTakenError } from '@/errors/port-taken.error';
import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error';
import { ExternalHooks } from '@/external-hooks'; import { ExternalHooks } from '@/external-hooks';
import type { ICredentialsOverwrite } from '@/interfaces'; import type { ICredentialsOverwrite } from '@/interfaces';
@ -40,6 +39,8 @@ export type WorkerServerEndpointsConfig = {
export class WorkerServer { export class WorkerServer {
private readonly port: number; private readonly port: number;
private readonly address: string;
private readonly server: Server; private readonly server: Server;
private readonly app: Application; private readonly app: Application;
@ -66,9 +67,15 @@ export class WorkerServer {
this.server = http.createServer(this.app); this.server = http.createServer(this.app);
this.port = this.globalConfig.queue.health.port; this.port = this.globalConfig.queue.health.port;
this.address = this.globalConfig.queue.health.address;
this.server.on('error', (error: NodeJS.ErrnoException) => { this.server.on('error', (error: NodeJS.ErrnoException) => {
if (error.code === 'EADDRINUSE') throw new PortTakenError(this.port); if (error.code === 'EADDRINUSE') {
this.logger.error(
`Port ${this.port} is already in use, possibly by the n8n main process server. Please set a different port for the worker server.`,
);
process.exit(1);
}
}); });
} }
@ -79,7 +86,7 @@ export class WorkerServer {
await this.mountEndpoints(); await this.mountEndpoints();
await new Promise<void>((resolve) => this.server.listen(this.port, resolve)); await new Promise<void>((resolve) => this.server.listen(this.port, this.address, resolve));
await this.externalHooks.run('worker.ready'); await this.externalHooks.run('worker.ready');