diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 38bde018ea..89e131bc0d 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -1,22 +1,15 @@ import { Flags, type Config } from '@oclif/core'; -import express from 'express'; -import http from 'http'; import { ApplicationError } from 'n8n-workflow'; import { Container } from 'typedi'; import config from '@/config'; import { N8N_VERSION, inTest } from '@/constants'; -import { CredentialsOverwrites } from '@/credentials-overwrites'; -import * as Db from '@/db'; -import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; -import type { ICredentialsOverwrite } from '@/interfaces'; -import { rawBodyReader, bodyParser } from '@/middlewares'; -import * as ResponseHelper from '@/response-helper'; import { JobProcessor } from '@/scaling/job-processor'; import type { ScalingService } from '@/scaling/scaling.service'; +import { WorkerServer } from '@/scaling/worker-server'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import type { RedisServicePubSubSubscriber } from '@/services/redis/redis-service-pub-sub-subscriber'; @@ -164,118 +157,17 @@ export class Worker extends BaseCommand { this.jobProcessor = Container.get(JobProcessor); } - async setupHealthMonitor() { - const { port } = this.globalConfig.queue.health; - - const app = express(); - app.disable('x-powered-by'); - - const server = http.createServer(app); - - app.get('/healthz/readiness', async (_req, res) => { - return Db.connectionState.connected && Db.connectionState.migrated - ? res.status(200).send({ status: 'ok' }) - : res.status(503).send({ status: 'error' }); - }); - - app.get( - '/healthz', - - async (_req: express.Request, res: express.Response) => { - this.logger.debug('Health check started!'); - - const connection = Db.getConnection(); - - try { - if (!connection.isInitialized) { - // Connection is not active - throw new ApplicationError('No active database connection'); - } - // DB ping - await connection.query('SELECT 1'); - } catch (e) { - this.logger.error('No Database connection!', e as Error); - const error = new ServiceUnavailableError('No Database connection!'); - return ResponseHelper.sendErrorResponse(res, error); - } - - // Just to be complete, generally will the worker stop automatically - // if it loses the connection to redis - try { - // Redis ping - await this.scalingService.pingQueue(); - } catch (e) { - this.logger.error('No Redis connection!', e as Error); - const error = new ServiceUnavailableError('No Redis connection!'); - return ResponseHelper.sendErrorResponse(res, error); - } - - // Everything fine - const responseData = { - status: 'ok', - }; - - this.logger.debug('Health check completed successfully!'); - - ResponseHelper.sendSuccessResponse(res, responseData, true, 200); - }, - ); - - let presetCredentialsLoaded = false; - - const endpointPresetCredentials = this.globalConfig.credentials.overwrite.endpoint; - if (endpointPresetCredentials !== '') { - // POST endpoint to set preset credentials - app.post( - `/${endpointPresetCredentials}`, - rawBodyReader, - bodyParser, - async (req: express.Request, res: express.Response) => { - if (!presetCredentialsLoaded) { - const body = req.body as ICredentialsOverwrite; - - if (req.contentType !== 'application/json') { - ResponseHelper.sendErrorResponse( - res, - new Error( - 'Body must be a valid JSON, make sure the content-type is application/json', - ), - ); - return; - } - - Container.get(CredentialsOverwrites).setData(body); - presetCredentialsLoaded = true; - ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); - } else { - ResponseHelper.sendErrorResponse(res, new Error('Preset credentials can be set once')); - } - }, - ); - } - - server.on('error', (error: Error & { code: string }) => { - if (error.code === 'EADDRINUSE') { - this.logger.error( - `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, - ); - process.exit(1); - } - }); - - await new Promise((resolve) => server.listen(port, () => resolve())); - await this.externalHooks?.run('worker.ready'); - this.logger.info(`\nn8n worker health check via, port ${port}`); - } - async run() { this.logger.info('\nn8n worker is now ready'); this.logger.info(` * Version: ${N8N_VERSION}`); this.logger.info(` * Concurrency: ${this.concurrency}`); this.logger.info(''); - if (this.globalConfig.queue.health.active) { - await this.setupHealthMonitor(); + if ( + this.globalConfig.queue.health.active || + this.globalConfig.credentials.overwrite.endpoint !== '' + ) { + await Container.get(WorkerServer).init(); } if (!inTest && process.stdout.isTTY) { diff --git a/packages/cli/src/errors/credentials-overwrites-already-set.error.ts b/packages/cli/src/errors/credentials-overwrites-already-set.error.ts new file mode 100644 index 0000000000..4c7534c4c1 --- /dev/null +++ b/packages/cli/src/errors/credentials-overwrites-already-set.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class CredentialsOverwritesAlreadySetError extends ApplicationError { + constructor() { + super('Credentials overwrites may not be set more than once.'); + } +} diff --git a/packages/cli/src/errors/non-json-body.error.ts b/packages/cli/src/errors/non-json-body.error.ts new file mode 100644 index 0000000000..cd2086dbf5 --- /dev/null +++ b/packages/cli/src/errors/non-json-body.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class NonJsonBodyError extends ApplicationError { + constructor() { + super('Body must be valid JSON. Please make sure `content-type` is `application/json`.'); + } +} diff --git a/packages/cli/src/errors/port-taken.error.ts b/packages/cli/src/errors/port-taken.error.ts new file mode 100644 index 0000000000..30c63a679f --- /dev/null +++ b/packages/cli/src/errors/port-taken.error.ts @@ -0,0 +1,9 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class PortTakenError extends ApplicationError { + constructor(port: number) { + super( + `Port ${port} is already in use. Do you already have the n8n main process running on that port?`, + ); + } +} diff --git a/packages/cli/src/scaling/__tests__/worker-server.test.ts b/packages/cli/src/scaling/__tests__/worker-server.test.ts new file mode 100644 index 0000000000..7628420975 --- /dev/null +++ b/packages/cli/src/scaling/__tests__/worker-server.test.ts @@ -0,0 +1,127 @@ +import type { GlobalConfig } from '@n8n/config'; +import type express from 'express'; +import { mock } from 'jest-mock-extended'; +import { AssertionError } from 'node:assert'; +import * as http from 'node:http'; + +import config from '@/config'; +import { PortTakenError } from '@/errors/port-taken.error'; +import type { ExternalHooks } from '@/external-hooks'; +import { bodyParser, rawBodyReader } from '@/middlewares'; + +import { WorkerServer } from '../worker-server'; + +const app = mock(); + +jest.mock('node:http'); +jest.mock('express', () => ({ __esModule: true, default: () => app })); + +const addressInUseError = () => { + const error: NodeJS.ErrnoException = new Error('Port already in use'); + error.code = 'EADDRINUSE'; + + return error; +}; + +describe('WorkerServer', () => { + let globalConfig: GlobalConfig; + + const externalHooks = mock(); + + beforeEach(() => { + config.set('generic.instanceType', 'worker'); + globalConfig = mock({ + queue: { + health: { active: true, port: 5678 }, + }, + credentials: { + overwrite: { endpoint: '' }, + }, + }); + jest.restoreAllMocks(); + }); + + describe('constructor', () => { + it('should throw if non-worker instance type', () => { + config.set('generic.instanceType', 'webhook'); + + expect( + () => new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks), + ).toThrowError(AssertionError); + }); + + it('should throw if port taken', async () => { + const server = mock(); + + jest.spyOn(http, 'createServer').mockReturnValue(server); + + server.on.mockImplementation((event: string, callback: (arg?: unknown) => void) => { + if (event === 'error') callback(addressInUseError()); + return server; + }); + + expect( + () => new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks), + ).toThrowError(PortTakenError); + }); + + it('should set up `/healthz` if health check is enabled', async () => { + jest.spyOn(http, 'createServer').mockReturnValue(mock()); + + new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks); + + expect(app.get).toHaveBeenCalledWith('/healthz', expect.any(Function)); + }); + + it('should not set up `/healthz` if health check is disabled', async () => { + globalConfig.queue.health.active = false; + + jest.spyOn(http, 'createServer').mockReturnValue(mock()); + + new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks); + + expect(app.get).not.toHaveBeenCalled(); + }); + + it('should set up `/:endpoint` if overwrites endpoint is set', async () => { + jest.spyOn(http, 'createServer').mockReturnValue(mock()); + + const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites'; + globalConfig.credentials.overwrite.endpoint = CREDENTIALS_OVERWRITE_ENDPOINT; + + new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks); + + expect(app.post).toHaveBeenCalledWith( + `/${CREDENTIALS_OVERWRITE_ENDPOINT}`, + rawBodyReader, + bodyParser, + expect.any(Function), + ); + }); + + it('should not set up `/:endpoint` if overwrites endpoint is not set', async () => { + jest.spyOn(http, 'createServer').mockReturnValue(mock()); + + new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks); + + expect(app.post).not.toHaveBeenCalled(); + }); + }); + + describe('init', () => { + it('should call `worker.ready` external hook', async () => { + const server = mock(); + jest.spyOn(http, 'createServer').mockReturnValue(server); + + server.listen.mockImplementation((_port, callback: () => void) => { + callback(); + return server; + }); + + const workerServer = new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks); + await workerServer.init(); + + expect(externalHooks.run).toHaveBeenCalledWith('worker.ready'); + }); + }); +}); diff --git a/packages/cli/src/scaling/worker-server.ts b/packages/cli/src/scaling/worker-server.ts new file mode 100644 index 0000000000..2727fa4733 --- /dev/null +++ b/packages/cli/src/scaling/worker-server.ts @@ -0,0 +1,129 @@ +import { GlobalConfig } from '@n8n/config'; +import express from 'express'; +import { ensureError } from 'n8n-workflow'; +import { strict as assert } from 'node:assert'; +import http from 'node:http'; +import type { Server } from 'node:http'; +import { Service } from 'typedi'; + +import config from '@/config'; +import { CredentialsOverwrites } from '@/credentials-overwrites'; +import * as Db from '@/db'; +import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error'; +import { NonJsonBodyError } from '@/errors/non-json-body.error'; +import { PortTakenError } from '@/errors/port-taken.error'; +import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; +import { ExternalHooks } from '@/external-hooks'; +import type { ICredentialsOverwrite } from '@/interfaces'; +import { Logger } from '@/logger'; +import { rawBodyReader, bodyParser } from '@/middlewares'; +import * as ResponseHelper from '@/response-helper'; +import { ScalingService } from '@/scaling/scaling.service'; + +/** + * Responsible for handling HTTP requests sent to a worker. + */ +@Service() +export class WorkerServer { + private readonly port: number; + + private readonly server: Server; + + /** + * @doc https://docs.n8n.io/embed/configuration/#credential-overwrites + */ + private overwritesLoaded = false; + + constructor( + private readonly globalConfig: GlobalConfig, + private readonly logger: Logger, + private readonly scalingService: ScalingService, + private readonly credentialsOverwrites: CredentialsOverwrites, + private readonly externalHooks: ExternalHooks, + ) { + assert(config.getEnv('generic.instanceType') === 'worker'); + + const app = express(); + + app.disable('x-powered-by'); + + this.server = http.createServer(app); + + this.port = this.globalConfig.queue.health.port; + + const overwritesEndpoint = this.globalConfig.credentials.overwrite.endpoint; + + this.server.on('error', (error: NodeJS.ErrnoException) => { + if (error.code === 'EADDRINUSE') throw new PortTakenError(this.port); + }); + + if (this.globalConfig.queue.health.active) { + app.get('/healthz', async (req, res) => await this.healthcheck(req, res)); + } + + if (overwritesEndpoint !== '') { + app.post(`/${overwritesEndpoint}`, rawBodyReader, bodyParser, (req, res) => + this.handleOverwrites(req, res), + ); + } + } + + async init() { + await new Promise((resolve) => this.server.listen(this.port, resolve)); + + await this.externalHooks.run('worker.ready'); + + this.logger.info(`\nn8n worker server listening on port ${this.port}`); + } + + private async healthcheck(_req: express.Request, res: express.Response) { + this.logger.debug('[WorkerServer] Health check started'); + + try { + await Db.getConnection().query('SELECT 1'); + } catch (value) { + this.logger.error('[WorkerServer] No database connection', ensureError(value)); + + return ResponseHelper.sendErrorResponse( + res, + new ServiceUnavailableError('No database connection'), + ); + } + + try { + await this.scalingService.pingQueue(); + } catch (value) { + this.logger.error('[WorkerServer] No Redis connection', ensureError(value)); + + return ResponseHelper.sendErrorResponse( + res, + new ServiceUnavailableError('No Redis connection'), + ); + } + + this.logger.debug('[WorkerServer] Health check succeeded'); + + ResponseHelper.sendSuccessResponse(res, { status: 'ok' }, true, 200); + } + + private handleOverwrites( + req: express.Request<{}, {}, ICredentialsOverwrite>, + res: express.Response, + ) { + if (this.overwritesLoaded) { + ResponseHelper.sendErrorResponse(res, new CredentialsOverwritesAlreadySetError()); + return; + } + + if (req.contentType !== 'application/json') { + ResponseHelper.sendErrorResponse(res, new NonJsonBodyError()); + return; + } + + this.credentialsOverwrites.setData(req.body); + + this.overwritesLoaded = true; + + ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); + } +}