diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 4b719e8443..f5f6b2b79b 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -10,6 +10,7 @@ import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; import { JobProcessor } from '@/scaling/job-processor'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import type { ScalingService } from '@/scaling/scaling.service'; +import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; @@ -160,12 +161,15 @@ export class Worker extends BaseCommand { this.logger.info(` * Concurrency: ${this.concurrency}`); this.logger.info(''); - if ( - this.globalConfig.queue.health.active || - this.globalConfig.credentials.overwrite.endpoint !== '' - ) { + const endpointsConfig: WorkerServerEndpointsConfig = { + health: this.globalConfig.queue.health.active, + overwrites: this.globalConfig.credentials.overwrite.endpoint !== '', + metrics: this.globalConfig.endpoints.metrics.enable, + }; + + if (Object.values(endpointsConfig).some((e) => e)) { const { WorkerServer } = await import('@/scaling/worker-server'); - await Container.get(WorkerServer).init(); + await Container.get(WorkerServer).init(endpointsConfig); } if (!inTest && process.stdout.isTTY) { diff --git a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts index f9e8558990..d78116f462 100644 --- a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts +++ b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts @@ -2,6 +2,7 @@ import { GlobalConfig } from '@n8n/config'; import type express from 'express'; import promBundle from 'express-prom-bundle'; import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; import promClient from 'prom-client'; import config from '@/config'; @@ -43,11 +44,13 @@ describe('PrometheusMetricsService', () => { const app = mock(); const eventBus = mock(); const eventService = mock(); + const instanceSettings = mock({ instanceType: 'main' }); const prometheusMetricsService = new PrometheusMetricsService( mock(), eventBus, globalConfig, eventService, + instanceSettings, ); afterEach(() => { @@ -64,6 +67,7 @@ describe('PrometheusMetricsService', () => { mock(), customGlobalConfig, mock(), + instanceSettings, ); await customPrometheusMetricsService.init(app); @@ -204,5 +208,18 @@ describe('PrometheusMetricsService', () => { expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics expect(eventService.on).not.toHaveBeenCalled(); }); + + it('should not set up queue metrics if enabled and on scaling mode but instance is not main', async () => { + config.set('executions.mode', 'queue'); + prometheusMetricsService.enableMetric('queue'); + // @ts-expect-error private field + instanceSettings.instanceType = 'worker'; + + await prometheusMetricsService.init(app); + + expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric + expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics + expect(eventService.on).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/cli/src/metrics/prometheus-metrics.service.ts b/packages/cli/src/metrics/prometheus-metrics.service.ts index 11b353437a..2565b0a6b1 100644 --- a/packages/cli/src/metrics/prometheus-metrics.service.ts +++ b/packages/cli/src/metrics/prometheus-metrics.service.ts @@ -1,6 +1,7 @@ import { GlobalConfig } from '@n8n/config'; import type express from 'express'; import promBundle from 'express-prom-bundle'; +import { InstanceSettings } from 'n8n-core'; import { EventMessageTypeNames } from 'n8n-workflow'; import promClient, { type Counter, type Gauge } from 'prom-client'; import semverParse from 'semver/functions/parse'; @@ -22,6 +23,7 @@ export class PrometheusMetricsService { private readonly eventBus: MessageEventBus, private readonly globalConfig: GlobalConfig, private readonly eventService: EventService, + private readonly instanceSettings: InstanceSettings, ) {} private readonly counters: { [key: string]: Counter | null } = {}; @@ -227,7 +229,13 @@ export class PrometheusMetricsService { } private initQueueMetrics() { - if (!this.includes.metrics.queue || config.getEnv('executions.mode') !== 'queue') return; + if ( + !this.includes.metrics.queue || + config.getEnv('executions.mode') !== 'queue' || + this.instanceSettings.instanceType !== 'main' + ) { + return; + } this.gauges.waiting = new promClient.Gauge({ name: this.prefix + 'scaling_mode_queue_jobs_waiting', diff --git a/packages/cli/src/scaling/__tests__/worker-server.test.ts b/packages/cli/src/scaling/__tests__/worker-server.test.ts index 753a1e3e18..d5716bfac4 100644 --- a/packages/cli/src/scaling/__tests__/worker-server.test.ts +++ b/packages/cli/src/scaling/__tests__/worker-server.test.ts @@ -7,6 +7,7 @@ import * as http from 'node:http'; import { PortTakenError } from '@/errors/port-taken.error'; import type { ExternalHooks } from '@/external-hooks'; +import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { bodyParser, rawBodyReader } from '@/middlewares'; import { WorkerServer } from '../worker-server'; @@ -28,6 +29,7 @@ describe('WorkerServer', () => { const externalHooks = mock(); const instanceSettings = mock({ instanceType: 'worker' }); + const prometheusMetricsService = mock(); beforeEach(() => { globalConfig = mock({ @@ -52,6 +54,7 @@ describe('WorkerServer', () => { mock(), externalHooks, mock({ instanceType: 'webhook' }), + prometheusMetricsService, ), ).toThrowError(AssertionError); }); @@ -68,55 +71,21 @@ describe('WorkerServer', () => { expect( () => - new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks, instanceSettings), + new WorkerServer( + globalConfig, + mock(), + mock(), + mock(), + externalHooks, + instanceSettings, + prometheusMetricsService, + ), ).toThrowError(PortTakenError); }); - - it('should set up `/healthz` if health check is enabled', async () => { - jest.spyOn(http, 'createServer').mockReturnValue(mock()); - - new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks, instanceSettings); - - expect(app.get).toHaveBeenCalledWith('/healthz', expect.any(Function)); - }); - - it('should not set up `/healthz` if health check is disabled', async () => { - globalConfig.queue.health.active = false; - - jest.spyOn(http, 'createServer').mockReturnValue(mock()); - - new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks, instanceSettings); - - expect(app.get).not.toHaveBeenCalled(); - }); - - it('should set up `/:endpoint` if overwrites endpoint is set', async () => { - jest.spyOn(http, 'createServer').mockReturnValue(mock()); - - const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites'; - globalConfig.credentials.overwrite.endpoint = CREDENTIALS_OVERWRITE_ENDPOINT; - - new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks, instanceSettings); - - expect(app.post).toHaveBeenCalledWith( - `/${CREDENTIALS_OVERWRITE_ENDPOINT}`, - rawBodyReader, - bodyParser, - expect.any(Function), - ); - }); - - it('should not set up `/:endpoint` if overwrites endpoint is not set', async () => { - jest.spyOn(http, 'createServer').mockReturnValue(mock()); - - new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks, instanceSettings); - - expect(app.post).not.toHaveBeenCalled(); - }); }); describe('init', () => { - it('should call `worker.ready` external hook', async () => { + it('should mount all endpoints when all are enabled', async () => { const server = mock(); jest.spyOn(http, 'createServer').mockReturnValue(server); @@ -132,8 +101,88 @@ describe('WorkerServer', () => { mock(), externalHooks, instanceSettings, + prometheusMetricsService, ); - await workerServer.init(); + + const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites'; + globalConfig.credentials.overwrite.endpoint = CREDENTIALS_OVERWRITE_ENDPOINT; + + await workerServer.init({ health: true, overwrites: true, metrics: true }); + + expect(app.get).toHaveBeenCalledWith('/healthz', expect.any(Function)); + expect(app.post).toHaveBeenCalledWith( + `/${CREDENTIALS_OVERWRITE_ENDPOINT}`, + rawBodyReader, + bodyParser, + expect.any(Function), + ); + expect(prometheusMetricsService.init).toHaveBeenCalledWith(app); + }); + + it('should mount only health and overwrites endpoints if only those are enabled', async () => { + const server = mock(); + jest.spyOn(http, 'createServer').mockReturnValue(server); + + server.listen.mockImplementation((_port, callback: () => void) => { + callback(); + return server; + }); + + const workerServer = new WorkerServer( + globalConfig, + mock(), + mock(), + mock(), + externalHooks, + instanceSettings, + prometheusMetricsService, + ); + + await workerServer.init({ health: true, overwrites: false, metrics: true }); + + expect(app.get).toHaveBeenCalledWith('/healthz', expect.any(Function)); + expect(app.post).not.toHaveBeenCalled(); + expect(prometheusMetricsService.init).toHaveBeenCalledWith(app); + }); + + it('should throw if no endpoints are enabled', async () => { + const server = mock(); + jest.spyOn(http, 'createServer').mockReturnValue(server); + + const workerServer = new WorkerServer( + globalConfig, + mock(), + mock(), + mock(), + externalHooks, + instanceSettings, + prometheusMetricsService, + ); + await expect( + workerServer.init({ health: false, overwrites: false, metrics: false }), + ).rejects.toThrowError(AssertionError); + }); + + it('should call `worker.ready` external hook', async () => { + const server = mock(); + jest.spyOn(http, 'createServer').mockReturnValue(server); + + const workerServer = new WorkerServer( + globalConfig, + mock(), + mock(), + mock(), + externalHooks, + instanceSettings, + prometheusMetricsService, + ); + + server.listen.mockImplementation((_port, callback: () => void) => { + callback(); + return server; + }); + + await workerServer.init({ health: true, overwrites: true, metrics: true }); expect(externalHooks.run).toHaveBeenCalledWith('worker.ready'); }); diff --git a/packages/cli/src/scaling/worker-server.ts b/packages/cli/src/scaling/worker-server.ts index cc8d463951..abc6a3a024 100644 --- a/packages/cli/src/scaling/worker-server.ts +++ b/packages/cli/src/scaling/worker-server.ts @@ -1,4 +1,5 @@ import { GlobalConfig } from '@n8n/config'; +import type { Application } from 'express'; import express from 'express'; import { InstanceSettings } from 'n8n-core'; import { ensureError } from 'n8n-workflow'; @@ -16,10 +17,22 @@ import { ServiceUnavailableError } from '@/errors/response-errors/service-unavai import { ExternalHooks } from '@/external-hooks'; import type { ICredentialsOverwrite } from '@/interfaces'; import { Logger } from '@/logger'; +import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { rawBodyReader, bodyParser } from '@/middlewares'; import * as ResponseHelper from '@/response-helper'; import { ScalingService } from '@/scaling/scaling.service'; +export type WorkerServerEndpointsConfig = { + /** Whether the `/healthz` endpoint is enabled. */ + health: boolean; + + /** Whether the [credentials overwrites endpoint](https://docs.n8n.io/embed/configuration/#credential-overwrites) is enabled. */ + overwrites: boolean; + + /** Whether the `/metrics` endpoint is enabled. */ + metrics: boolean; +}; + /** * Responsible for handling HTTP requests sent to a worker. */ @@ -29,9 +42,10 @@ export class WorkerServer { private readonly server: Server; - /** - * @doc https://docs.n8n.io/embed/configuration/#credential-overwrites - */ + private readonly app: Application; + + private endpointsConfig: WorkerServerEndpointsConfig; + private overwritesLoaded = false; constructor( @@ -41,35 +55,30 @@ export class WorkerServer { private readonly credentialsOverwrites: CredentialsOverwrites, private readonly externalHooks: ExternalHooks, private readonly instanceSettings: InstanceSettings, + private readonly prometheusMetricsService: PrometheusMetricsService, ) { assert(this.instanceSettings.instanceType === 'worker'); - const app = express(); + this.app = express(); - app.disable('x-powered-by'); + this.app.disable('x-powered-by'); - this.server = http.createServer(app); + this.server = http.createServer(this.app); this.port = this.globalConfig.queue.health.port; - const overwritesEndpoint = this.globalConfig.credentials.overwrite.endpoint; - this.server.on('error', (error: NodeJS.ErrnoException) => { if (error.code === 'EADDRINUSE') throw new PortTakenError(this.port); }); - - if (this.globalConfig.queue.health.active) { - app.get('/healthz', async (req, res) => await this.healthcheck(req, res)); - } - - if (overwritesEndpoint !== '') { - app.post(`/${overwritesEndpoint}`, rawBodyReader, bodyParser, (req, res) => - this.handleOverwrites(req, res), - ); - } } - async init() { + async init(endpointsConfig: WorkerServerEndpointsConfig) { + assert(Object.values(endpointsConfig).some((e) => e)); + + this.endpointsConfig = endpointsConfig; + + await this.mountEndpoints(); + await new Promise((resolve) => this.server.listen(this.port, resolve)); await this.externalHooks.run('worker.ready'); @@ -77,6 +86,24 @@ export class WorkerServer { this.logger.info(`\nn8n worker server listening on port ${this.port}`); } + private async mountEndpoints() { + if (this.endpointsConfig.health) { + this.app.get('/healthz', async (req, res) => await this.healthcheck(req, res)); + } + + if (this.endpointsConfig.overwrites) { + const { endpoint } = this.globalConfig.credentials.overwrite; + + this.app.post(`/${endpoint}`, rawBodyReader, bodyParser, (req, res) => + this.handleOverwrites(req, res), + ); + } + + if (this.endpointsConfig.metrics) { + await this.prometheusMetricsService.init(this.app); + } + } + private async healthcheck(_req: express.Request, res: express.Response) { this.logger.debug('[WorkerServer] Health check started');