diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index b5615ca25e..812bf42802 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -7,6 +7,8 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/no-unused-vars */ // eslint-disable-next-line import/no-extraneous-dependencies +import * as express from 'express'; +import * as http from 'http'; import * as PCancelable from 'p-cancelable'; import { Command, flags } from '@oclif/command'; @@ -14,7 +16,7 @@ import { BinaryDataManager, IBinaryDataConfig, UserSettings, WorkflowExecute } f import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; -import { FindOneOptions } from 'typeorm'; +import { FindOneOptions, getConnectionManager } from 'typeorm'; import * as Bull from 'bull'; import { @@ -328,6 +330,77 @@ export class Worker extends Command { logger.error('Error from queue: ', error); } }); + + if (config.get('queue.health.active')) { + const port = config.get('queue.health.port') as number; + + const app = express(); + const server = http.createServer(app); + + app.get( + '/healthz', + // eslint-disable-next-line consistent-return + async (req: express.Request, res: express.Response) => { + LoggerProxy.debug('Health check started!'); + + const connection = getConnectionManager().get(); + + try { + if (!connection.isConnected) { + // Connection is not active + throw new Error('No active database connection!'); + } + // DB ping + await connection.query('SELECT 1'); + } catch (e) { + LoggerProxy.error('No Database connection!', e); + const error = new ResponseHelper.ResponseError( + 'No Database connection!', + undefined, + 503, + ); + return ResponseHelper.sendErrorResponse(res, error); + } + + // Just to be complete, generally will the worker stop automatically + // if it loses the conection to redis + try { + // Redis ping + await Worker.jobQueue.client.ping(); + } catch (e) { + LoggerProxy.error('No Redis connection!', e); + const error = new ResponseHelper.ResponseError( + 'No Redis connection!', + undefined, + 503, + ); + return ResponseHelper.sendErrorResponse(res, error); + } + + // Everything fine + const responseData = { + status: 'ok', + }; + + LoggerProxy.debug('Health check completed successfully!'); + + ResponseHelper.sendSuccessResponse(res, responseData, true, 200); + }, + ); + + server.listen(port, () => { + console.info(`\nn8n worker health check via, port ${port}`); + }); + + server.on('error', (error: Error & { code: string }) => { + if (error.code === 'EADDRINUSE') { + console.log( + `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, + ); + process.exit(1); + } + }); + } } catch (error) { logger.error(`Worker process cannot continue. "${error.message}"`); diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index 08906c7375..9bd78931a3 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -290,6 +290,20 @@ const config = convict({ }, queue: { + health: { + active: { + doc: 'If health checks should be enabled', + format: 'Boolean', + default: false, + env: 'QUEUE_HEALTH_CHECK_ACTIVE', + }, + port: { + doc: 'Port to serve health check on if activated', + format: Number, + default: 5678, + env: 'QUEUE_HEALTH_CHECK_PORT', + }, + }, bull: { prefix: { doc: 'Prefix for all queue keys', diff --git a/packages/cli/package.json b/packages/cli/package.json index 2a137fcee2..e50895ee9b 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -76,7 +76,6 @@ "concurrently": "^5.1.0", "jest": "^26.4.2", "nodemon": "^2.0.2", - "p-cancelable": "^2.0.0", "run-script-os": "^1.0.7", "ts-jest": "^26.3.0", "ts-node": "^8.9.1", @@ -120,6 +119,7 @@ "n8n-workflow": "~0.86.0", "oauth-1.0a": "^2.2.6", "open": "^7.0.0", + "p-cancelable": "^2.0.0", "pg": "^8.3.0", "prom-client": "^13.1.0", "request-promise-native": "^1.0.7",