diff --git a/packages/@n8n/config/src/configs/scaling-mode.config.ts b/packages/@n8n/config/src/configs/scaling-mode.config.ts index a1f5b2a7d6..05ee6b4841 100644 --- a/packages/@n8n/config/src/configs/scaling-mode.config.ts +++ b/packages/@n8n/config/src/configs/scaling-mode.config.ts @@ -2,7 +2,11 @@ import { Config, Env, Nested } from '../decorators'; @Config class HealthConfig { - /** Whether to enable the worker health check endpoint `/healthz`. */ + /** + * Whether to enable the worker health check endpoints: + * - `/healthz` (worker alive) + * - `/healthz/readiness` (worker connected to migrated database and connected to Redis) + */ @Env('QUEUE_HEALTH_CHECK_ACTIVE') active: boolean = false; diff --git a/packages/cli/BREAKING-CHANGES.md b/packages/cli/BREAKING-CHANGES.md index 012892c6ae..bdb1ff5890 100644 --- a/packages/cli/BREAKING-CHANGES.md +++ b/packages/cli/BREAKING-CHANGES.md @@ -6,11 +6,13 @@ This list shows all the versions which include breaking changes and how to upgra ### What changed? -The worker server used to bind to IPv6 by default. It now binds to IPv4 by default. +1. The worker server used to bind to IPv6 by default. It now binds to IPv4 by default. +2. The worker server's `/healthz` used to report healthy status based on database and Redis checks. It now reports healthy status regardless of database and Redis status, and the database and Redis checks are part of `/healthz/readiness`. ### When is action necessary? -If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`. +1. If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`. +2. If you are relying on database and Redis checks for worker health status, switch to checking `/healthz/readiness` instead of `/healthz`. ## 1.57.0 diff --git a/packages/cli/src/scaling/__tests__/worker-server.test.ts b/packages/cli/src/scaling/__tests__/worker-server.test.ts index 7f3a21a778..778d403bf2 100644 --- a/packages/cli/src/scaling/__tests__/worker-server.test.ts +++ b/packages/cli/src/scaling/__tests__/worker-server.test.ts @@ -50,10 +50,10 @@ describe('WorkerServer', () => { globalConfig, mock(), mock(), - mock(), externalHooks, mock({ instanceType: 'webhook' }), prometheusMetricsService, + mock(), ), ).toThrowError(AssertionError); }); @@ -75,10 +75,10 @@ describe('WorkerServer', () => { globalConfig, mock(), mock(), - mock(), externalHooks, instanceSettings, prometheusMetricsService, + mock(), ); expect(procesExitSpy).toHaveBeenCalledWith(1); @@ -102,10 +102,10 @@ describe('WorkerServer', () => { globalConfig, mock(), mock(), - mock(), externalHooks, instanceSettings, prometheusMetricsService, + mock(), ); const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites'; @@ -137,10 +137,10 @@ describe('WorkerServer', () => { globalConfig, mock(), mock(), - mock(), externalHooks, instanceSettings, prometheusMetricsService, + mock(), ); await workerServer.init({ health: true, overwrites: false, metrics: true }); @@ -158,10 +158,10 @@ describe('WorkerServer', () => { globalConfig, mock(), mock(), - mock(), externalHooks, instanceSettings, prometheusMetricsService, + mock(), ); await expect( workerServer.init({ health: false, overwrites: false, metrics: false }), @@ -176,10 +176,10 @@ describe('WorkerServer', () => { globalConfig, mock(), mock(), - mock(), externalHooks, instanceSettings, prometheusMetricsService, + mock(), ); server.listen.mockImplementation((...args: unknown[]) => { diff --git a/packages/cli/src/scaling/worker-server.ts b/packages/cli/src/scaling/worker-server.ts index 3343ce4e49..3cf6995882 100644 --- a/packages/cli/src/scaling/worker-server.ts +++ b/packages/cli/src/scaling/worker-server.ts @@ -2,7 +2,6 @@ import { GlobalConfig } from '@n8n/config'; import type { Application } from 'express'; import express from 'express'; import { InstanceSettings } from 'n8n-core'; -import { ensureError } from 'n8n-workflow'; import { strict as assert } from 'node:assert'; import http from 'node:http'; import type { Server } from 'node:http'; @@ -12,14 +11,13 @@ import { CredentialsOverwrites } from '@/credentials-overwrites'; import * as Db from '@/db'; import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error'; import { NonJsonBodyError } from '@/errors/non-json-body.error'; -import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; import { ExternalHooks } from '@/external-hooks'; import type { ICredentialsOverwrite } from '@/interfaces'; import { Logger } from '@/logging/logger.service'; import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { rawBodyReader, bodyParser } from '@/middlewares'; import * as ResponseHelper from '@/response-helper'; -import { ScalingService } from '@/scaling/scaling.service'; +import { RedisClientService } from '@/services/redis-client.service'; export type WorkerServerEndpointsConfig = { /** Whether the `/healthz` endpoint is enabled. */ @@ -52,11 +50,11 @@ export class WorkerServer { constructor( private readonly globalConfig: GlobalConfig, private readonly logger: Logger, - private readonly scalingService: ScalingService, private readonly credentialsOverwrites: CredentialsOverwrites, private readonly externalHooks: ExternalHooks, private readonly instanceSettings: InstanceSettings, private readonly prometheusMetricsService: PrometheusMetricsService, + private readonly redisClientService: RedisClientService, ) { assert(this.instanceSettings.instanceType === 'worker'); @@ -94,11 +92,14 @@ export class WorkerServer { } private async mountEndpoints() { - if (this.endpointsConfig.health) { - this.app.get('/healthz', async (req, res) => await this.healthcheck(req, res)); + const { health, overwrites, metrics } = this.endpointsConfig; + + if (health) { + this.app.get('/healthz', async (_, res) => res.send({ status: 'ok' })); + this.app.get('/healthz/readiness', async (_, res) => await this.readiness(_, res)); } - if (this.endpointsConfig.overwrites) { + if (overwrites) { const { endpoint } = this.globalConfig.credentials.overwrite; this.app.post(`/${endpoint}`, rawBodyReader, bodyParser, (req, res) => @@ -106,39 +107,20 @@ export class WorkerServer { ); } - if (this.endpointsConfig.metrics) { + if (metrics) { await this.prometheusMetricsService.init(this.app); } } - private async healthcheck(_req: express.Request, res: express.Response) { - this.logger.debug('[WorkerServer] Health check started'); + private async readiness(_req: express.Request, res: express.Response) { + const isReady = + Db.connectionState.connected && + Db.connectionState.migrated && + this.redisClientService.isConnected(); - try { - await Db.getConnection().query('SELECT 1'); - } catch (value) { - this.logger.error('[WorkerServer] No database connection', ensureError(value)); - - return ResponseHelper.sendErrorResponse( - res, - new ServiceUnavailableError('No database connection'), - ); - } - - try { - await this.scalingService.pingQueue(); - } catch (value) { - this.logger.error('[WorkerServer] No Redis connection', ensureError(value)); - - return ResponseHelper.sendErrorResponse( - res, - new ServiceUnavailableError('No Redis connection'), - ); - } - - this.logger.debug('[WorkerServer] Health check succeeded'); - - ResponseHelper.sendSuccessResponse(res, { status: 'ok' }, true, 200); + return isReady + ? res.status(200).send({ status: 'ok' }) + : res.status(503).send({ status: 'error' }); } private handleOverwrites( diff --git a/packages/cli/src/services/redis-client.service.ts b/packages/cli/src/services/redis-client.service.ts index f205a756c5..5eaa6edc1d 100644 --- a/packages/cli/src/services/redis-client.service.ts +++ b/packages/cli/src/services/redis-client.service.ts @@ -40,6 +40,10 @@ export class RedisClientService extends TypedEmitter { this.registerListeners(); } + isConnected() { + return !this.lostConnection; + } + createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) { const client = this.clusterNodes().length > 0