perf(core): Optimize worker healthchecks (#11092)

This commit is contained in:
Iván Ovejero 2024-10-07 16:33:34 +02:00 committed by GitHub
parent 383b4765d2
commit 19fb728da0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 36 additions and 44 deletions

View file

@ -2,7 +2,11 @@ import { Config, Env, Nested } from '../decorators';
@Config
class HealthConfig {
/** Whether to enable the worker health check endpoint `/healthz`. */
/**
* Whether to enable the worker health check endpoints:
* - `/healthz` (worker alive)
* - `/healthz/readiness` (worker connected to migrated database and connected to Redis)
*/
@Env('QUEUE_HEALTH_CHECK_ACTIVE')
active: boolean = false;

View file

@ -6,11 +6,13 @@ This list shows all the versions which include breaking changes and how to upgra
### What changed?
The worker server used to bind to IPv6 by default. It now binds to IPv4 by default.
1. The worker server used to bind to IPv6 by default. It now binds to IPv4 by default.
2. The worker server's `/healthz` used to report healthy status based on database and Redis checks. It now reports healthy status regardless of database and Redis status, and the database and Redis checks are part of `/healthz/readiness`.
### When is action necessary?
If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`.
1. If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`.
2. If you are relying on database and Redis checks for worker health status, switch to checking `/healthz/readiness` instead of `/healthz`.
## 1.57.0

View file

@ -50,10 +50,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
mock<InstanceSettings>({ instanceType: 'webhook' }),
prometheusMetricsService,
mock(),
),
).toThrowError(AssertionError);
});
@ -75,10 +75,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
expect(procesExitSpy).toHaveBeenCalledWith(1);
@ -102,10 +102,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites';
@ -137,10 +137,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
await workerServer.init({ health: true, overwrites: false, metrics: true });
@ -158,10 +158,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
await expect(
workerServer.init({ health: false, overwrites: false, metrics: false }),
@ -176,10 +176,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
server.listen.mockImplementation((...args: unknown[]) => {

View file

@ -2,7 +2,6 @@ import { GlobalConfig } from '@n8n/config';
import type { Application } from 'express';
import express from 'express';
import { InstanceSettings } from 'n8n-core';
import { ensureError } from 'n8n-workflow';
import { strict as assert } from 'node:assert';
import http from 'node:http';
import type { Server } from 'node:http';
@ -12,14 +11,13 @@ import { CredentialsOverwrites } from '@/credentials-overwrites';
import * as Db from '@/db';
import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error';
import { NonJsonBodyError } from '@/errors/non-json-body.error';
import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error';
import { ExternalHooks } from '@/external-hooks';
import type { ICredentialsOverwrite } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
import { rawBodyReader, bodyParser } from '@/middlewares';
import * as ResponseHelper from '@/response-helper';
import { ScalingService } from '@/scaling/scaling.service';
import { RedisClientService } from '@/services/redis-client.service';
export type WorkerServerEndpointsConfig = {
/** Whether the `/healthz` endpoint is enabled. */
@ -52,11 +50,11 @@ export class WorkerServer {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly logger: Logger,
private readonly scalingService: ScalingService,
private readonly credentialsOverwrites: CredentialsOverwrites,
private readonly externalHooks: ExternalHooks,
private readonly instanceSettings: InstanceSettings,
private readonly prometheusMetricsService: PrometheusMetricsService,
private readonly redisClientService: RedisClientService,
) {
assert(this.instanceSettings.instanceType === 'worker');
@ -94,11 +92,14 @@ export class WorkerServer {
}
private async mountEndpoints() {
if (this.endpointsConfig.health) {
this.app.get('/healthz', async (req, res) => await this.healthcheck(req, res));
const { health, overwrites, metrics } = this.endpointsConfig;
if (health) {
this.app.get('/healthz', async (_, res) => res.send({ status: 'ok' }));
this.app.get('/healthz/readiness', async (_, res) => await this.readiness(_, res));
}
if (this.endpointsConfig.overwrites) {
if (overwrites) {
const { endpoint } = this.globalConfig.credentials.overwrite;
this.app.post(`/${endpoint}`, rawBodyReader, bodyParser, (req, res) =>
@ -106,39 +107,20 @@ export class WorkerServer {
);
}
if (this.endpointsConfig.metrics) {
if (metrics) {
await this.prometheusMetricsService.init(this.app);
}
}
private async healthcheck(_req: express.Request, res: express.Response) {
this.logger.debug('[WorkerServer] Health check started');
private async readiness(_req: express.Request, res: express.Response) {
const isReady =
Db.connectionState.connected &&
Db.connectionState.migrated &&
this.redisClientService.isConnected();
try {
await Db.getConnection().query('SELECT 1');
} catch (value) {
this.logger.error('[WorkerServer] No database connection', ensureError(value));
return ResponseHelper.sendErrorResponse(
res,
new ServiceUnavailableError('No database connection'),
);
}
try {
await this.scalingService.pingQueue();
} catch (value) {
this.logger.error('[WorkerServer] No Redis connection', ensureError(value));
return ResponseHelper.sendErrorResponse(
res,
new ServiceUnavailableError('No Redis connection'),
);
}
this.logger.debug('[WorkerServer] Health check succeeded');
ResponseHelper.sendSuccessResponse(res, { status: 'ok' }, true, 200);
return isReady
? res.status(200).send({ status: 'ok' })
: res.status(503).send({ status: 'error' });
}
private handleOverwrites(

View file

@ -40,6 +40,10 @@ export class RedisClientService extends TypedEmitter<RedisEventMap> {
this.registerListeners();
}
isConnected() {
return !this.lostConnection;
}
createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) {
const client =
this.clusterNodes().length > 0