feat(core): Add commands to workers to respond with current state (#7029)

This PR adds new endpoints to the REST API:
`/orchestration/worker/status` and `/orchestration/worker/id`

Currently these just trigger the return of status / ids from the workers
via the redis back channel, this still needs to be handled and passed
through to the frontend.

It also adds the eventbus to each worker, and triggers a reload of those
eventbus instances when the configuration changes on the main instances.
This commit is contained in:
Michael Auerswald 2023-09-07 14:44:19 +02:00 committed by GitHub
parent 0a35025e5e
commit 7b49cf2a2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 794 additions and 225 deletions

View file

@ -4,7 +4,7 @@ import type { Server } from 'http';
import express from 'express';
import compression from 'compression';
import isbot from 'isbot';
import { jsonParse, LoggerProxy as Logger } from 'n8n-workflow';
import { LoggerProxy as Logger } from 'n8n-workflow';
import config from '@/config';
import { N8N_VERSION, inDevelopment, inTest } from '@/constants';
@ -18,16 +18,8 @@ import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import { webhookRequestHandler } from '@/WebhookHelpers';
import { RedisService } from '@/services/redis.service';
import { eventBus } from './eventbus';
import type { AbstractEventMessageOptions } from './eventbus/EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from './eventbus/EventMessageClasses/Helpers';
import type { RedisServiceWorkerResponseObject } from './services/redis/RedisServiceCommands';
import {
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './services/redis/RedisServiceHelper';
import { generateHostInstanceId } from './databases/utils/generators';
import { OrchestrationService } from './services/orchestration.service';
export abstract class AbstractServer {
protected server: Server;
@ -124,78 +116,11 @@ export abstract class AbstractServer {
});
if (config.getEnv('executions.mode') === 'queue') {
await this.setupRedis();
// will start the redis connections
await Container.get(OrchestrationService).init(this.uniqueInstanceId);
}
}
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using a retryStrategy to control how and when to exit.
// We are also subscribing to the event log channel to receive events from workers
private async setupRedis() {
const redisService = Container.get(RedisService);
const redisSubscriber = await redisService.getPubSubSubscriber();
// TODO: these are all proof of concept implementations for the moment
// until worker communication is implemented
// #region proof of concept
await redisSubscriber.subscribeToEventLog();
await redisSubscriber.subscribeToWorkerResponseChannel();
redisSubscriber.addMessageHandler(
'AbstractServerReceiver',
async (channel: string, message: string) => {
// TODO: this is a proof of concept implementation to forward events to the main instance's event bus
// Events are arriving through a pub/sub channel and are forwarded to the eventBus
// In the future, a stream should probably replace this implementation entirely
if (channel === EVENT_BUS_REDIS_CHANNEL) {
const eventData = jsonParse<AbstractEventMessageOptions>(message);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await eventBus.send(eventMessage);
}
}
} else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
// The back channel from the workers as a pub/sub channel
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(message);
if (workerResponse) {
// TODO: Handle worker response
console.log('Received worker response', workerResponse);
}
}
},
);
// TODO: Leave comments for now as implementation example
// const redisStreamListener = await redisService.getStreamConsumer();
// void redisStreamListener.listenToStream('teststream');
// redisStreamListener.addMessageHandler(
// 'MessageLogger',
// async (stream: string, id: string, message: string[]) => {
// // TODO: this is a proof of concept implementation of a stream consumer
// switch (stream) {
// case EVENT_BUS_REDIS_STREAM:
// case COMMAND_REDIS_STREAM:
// case WORKER_RESPONSE_REDIS_STREAM:
// default:
// LoggerProxy.debug(
// `Received message from stream ${stream} with id ${id} and message ${message.join(
// ',',
// )}`,
// );
// break;
// }
// },
// );
// const redisListReceiver = await redisService.getListReceiver();
// await redisListReceiver.init();
// setInterval(async () => {
// await redisListReceiver.popLatestWorkerResponse();
// }, 1000);
// #endregion
}
async init(): Promise<void> {
const { app, protocol, sslKey, sslCert } = this;

View file

@ -177,6 +177,7 @@ import { handleMfaDisable, isMfaFeatureEnabled } from './Mfa/helpers';
import { JwtService } from './services/jwt.service';
import { RoleService } from './services/role.service';
import { UserService } from './services/user.service';
import { OrchestrationController } from './controllers/orchestration.controller';
const exec = promisify(callbackExec);
@ -551,6 +552,7 @@ export class Server extends AbstractServer {
Container.get(SourceControlController),
Container.get(WorkflowStatisticsController),
Container.get(ExternalSecretsController),
Container.get(OrchestrationController),
];
if (isLdapEnabled()) {

View file

@ -103,12 +103,12 @@ export abstract class BaseCommand extends Command {
process.exit(1);
}
protected async initBinaryManager() {
async initBinaryManager() {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
}
protected async initExternalHooks() {
async initExternalHooks() {
this.externalHooks = Container.get(ExternalHooks);
await this.externalHooks.init();
}

View file

@ -27,6 +27,11 @@ import { generateHostInstanceId } from '@/databases/utils/generators';
import type { ICredentialsOverwrite } from '@/Interfaces';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { rawBodyReader, bodyParser } from '@/middlewares';
import { eventBus } from '../eventbus';
import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher';
import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber';
import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric';
import { getWorkerCommandReceivedHandler } from './workerCommandHandler';
export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
@ -49,6 +54,10 @@ export class Worker extends BaseCommand {
readonly uniqueInstanceId = generateHostInstanceId('worker');
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber;
/**
* Stop n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
@ -240,9 +249,48 @@ export class Worker extends BaseCommand {
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
await this.initEventBus();
await this.initRedis();
await this.initQueue();
}
async run() {
async initEventBus() {
await eventBus.initialize({
workerId: this.uniqueInstanceId,
});
}
/**
* Initializes the redis connection
* A publishing connection to redis is created to publish events to the event log
* A subscription connection to redis is created to subscribe to commands from the main process
* The subscription connection adds a handler to handle the command messages
*/
async initRedis() {
this.redisPublisher = Container.get(RedisServicePubSubPublisher);
this.redisSubscriber = Container.get(RedisServicePubSubSubscriber);
await this.redisPublisher.init();
await this.redisPublisher.publishToEventLog(
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.uniqueInstanceId,
},
}),
);
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'WorkerCommandReceivedHandler',
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({
uniqueInstanceId: this.uniqueInstanceId,
redisPublisher: this.redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
}),
);
}
async initQueue() {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Worker);
@ -255,11 +303,6 @@ export class Worker extends BaseCommand {
this.runJob(job, this.nodeTypes),
);
this.logger.info('\nn8n worker is now ready');
this.logger.info(` * Version: ${N8N_VERSION}`);
this.logger.info(` * Concurrency: ${flags.concurrency}`);
this.logger.info('');
Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => {
// Progress of a job got updated which does get used
// to communicate that a job got canceled.
@ -305,105 +348,116 @@ export class Worker extends BaseCommand {
throw error;
}
});
}
if (config.getEnv('queue.health.active')) {
const port = config.getEnv('queue.health.port');
async setupHealthMonitor() {
const port = config.getEnv('queue.health.port');
const app = express();
app.disable('x-powered-by');
const app = express();
app.disable('x-powered-by');
const server = http.createServer(app);
const server = http.createServer(app);
app.get(
'/healthz',
app.get(
'/healthz',
async (req: express.Request, res: express.Response) => {
LoggerProxy.debug('Health check started!');
const connection = Db.getConnection();
try {
if (!connection.isInitialized) {
// Connection is not active
throw new Error('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
} catch (e) {
LoggerProxy.error('No Database connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Database connection!');
return ResponseHelper.sendErrorResponse(res, error);
}
// Just to be complete, generally will the worker stop automatically
// if it loses the connection to redis
try {
// Redis ping
await Worker.jobQueue.client.ping();
} catch (e) {
LoggerProxy.error('No Redis connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
return ResponseHelper.sendErrorResponse(res, error);
}
// Everything fine
const responseData = {
status: 'ok',
};
LoggerProxy.debug('Health check completed successfully!');
ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
},
);
let presetCredentialsLoaded = false;
const endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
if (endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
app.post(
`/${endpointPresetCredentials}`,
rawBodyReader,
bodyParser,
async (req: express.Request, res: express.Response) => {
LoggerProxy.debug('Health check started!');
if (!presetCredentialsLoaded) {
const body = req.body as ICredentialsOverwrite;
const connection = Db.getConnection();
try {
if (!connection.isInitialized) {
// Connection is not active
throw new Error('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
} catch (e) {
LoggerProxy.error('No Database connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Database connection!');
return ResponseHelper.sendErrorResponse(res, error);
}
// Just to be complete, generally will the worker stop automatically
// if it loses the connection to redis
try {
// Redis ping
await Worker.jobQueue.client.ping();
} catch (e) {
LoggerProxy.error('No Redis connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
return ResponseHelper.sendErrorResponse(res, error);
}
// Everything fine
const responseData = {
status: 'ok',
};
LoggerProxy.debug('Health check completed successfully!');
ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
},
);
let presetCredentialsLoaded = false;
const endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
if (endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
app.post(
`/${endpointPresetCredentials}`,
rawBodyReader,
bodyParser,
async (req: express.Request, res: express.Response) => {
if (!presetCredentialsLoaded) {
const body = req.body as ICredentialsOverwrite;
if (req.contentType !== 'application/json') {
ResponseHelper.sendErrorResponse(
res,
new Error(
'Body must be a valid JSON, make sure the content-type is application/json',
),
);
return;
}
CredentialsOverwrites().setData(body);
presetCredentialsLoaded = true;
ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200);
} else {
if (req.contentType !== 'application/json') {
ResponseHelper.sendErrorResponse(
res,
new Error('Preset credentials can be set once'),
new Error(
'Body must be a valid JSON, make sure the content-type is application/json',
),
);
return;
}
},
CredentialsOverwrites().setData(body);
presetCredentialsLoaded = true;
ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200);
} else {
ResponseHelper.sendErrorResponse(res, new Error('Preset credentials can be set once'));
}
},
);
}
server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
this.logger.error(
`n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`,
);
process.exit(1);
}
});
server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
this.logger.error(
`n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`,
);
process.exit(1);
}
});
await new Promise<void>((resolve) => server.listen(port, () => resolve()));
await this.externalHooks.run('worker.ready');
this.logger.info(`\nn8n worker health check via, port ${port}`);
}
await new Promise<void>((resolve) => server.listen(port, () => resolve()));
await this.externalHooks.run('worker.ready');
this.logger.info(`\nn8n worker health check via, port ${port}`);
async run() {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Worker);
this.logger.info('\nn8n worker is now ready');
this.logger.info(` * Version: ${N8N_VERSION}`);
this.logger.info(` * Concurrency: ${flags.concurrency}`);
this.logger.info('');
if (config.getEnv('queue.health.active')) {
await this.setupHealthMonitor();
}
// Make sure that the process does not close

View file

@ -0,0 +1,82 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import { eventBus } from '../eventbus';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher';
import * as os from 'os';
export function getWorkerCommandReceivedHandler(options: {
uniqueInstanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
}) {
return async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
if (!messageString) return;
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
LoggerProxy.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
if (message.targets && !message.targets.includes(options.uniqueInstanceId)) {
return; // early return if the message is not for this worker
}
switch (message.command) {
case 'getStatus':
await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId,
command: message.command,
payload: {
workerId: options.uniqueInstanceId,
runningJobs: options.getRunningJobIds(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
net: Object.values(os.networkInterfaces()).flatMap(
(interfaces) =>
interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '',
),
},
});
break;
case 'getId':
await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId,
command: message.command,
});
break;
case 'restartEventBus':
await eventBus.restart();
await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId,
command: message.command,
payload: {
result: 'success',
},
});
break;
case 'stopWorker':
// TODO: implement proper shutdown
// await this.stopProcess();
break;
default:
LoggerProxy.debug(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`,
);
break;
}
}
}
};
}

View file

@ -0,0 +1,35 @@
import config from '@/config';
import { Authorized, Get, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi';
import { OrchestrationService } from '../services/orchestration.service';
@Authorized(['global', 'owner'])
@RestController('/orchestration')
@Service()
export class OrchestrationController {
private config = config;
constructor(private readonly orchestrationService: OrchestrationService) {}
/**
* These endpoint currently do not return anything, they just trigger the messsage to
* the workers to respond on Redis with their status.
* TODO: these responses need to be forwarded to and handled by the frontend
*/
@Get('/worker/status/:id')
async getWorkersStatus(req: OrchestrationRequest.Get) {
const id = req.params.id;
return this.orchestrationService.getWorkerStatus(id);
}
@Get('/worker/status')
async getWorkersStatusAll() {
return this.orchestrationService.getWorkerStatus();
}
@Get('/worker/ids')
async getWorkerIdsAll() {
return this.orchestrationService.getWorkerIds();
}
}

View file

@ -9,6 +9,7 @@ export const eventNamesWorkflow = [
'n8n.workflow.failed',
'n8n.workflow.crashed',
] as const;
export const eventNamesGeneric = ['n8n.worker.started', 'n8n.worker.stopped'] as const;
export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const;
export const eventNamesAudit = [
'n8n.audit.user.login.success',
@ -37,14 +38,21 @@ export const eventNamesAudit = [
export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number];
export type EventNamesAuditType = (typeof eventNamesAudit)[number];
export type EventNamesNodeType = (typeof eventNamesNode)[number];
export type EventNamesGenericType = (typeof eventNamesGeneric)[number];
export type EventNamesTypes =
| EventNamesAuditType
| EventNamesWorkflowType
| EventNamesNodeType
| EventNamesGenericType
| 'n8n.destination.test';
export const eventNamesAll = [...eventNamesAudit, ...eventNamesWorkflow, ...eventNamesNode];
export const eventNamesAll = [
...eventNamesAudit,
...eventNamesWorkflow,
...eventNamesNode,
...eventNamesGeneric,
];
export type EventMessageTypes =
| EventMessageGeneric

View file

@ -29,6 +29,7 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { OrchestrationService } from '../../services/orchestration.service';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -37,6 +38,11 @@ export interface MessageWithCallback {
confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void;
}
export interface MessageEventBusInitializeOptions {
skipRecoveryPass?: boolean;
workerId?: string;
}
export class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus;
@ -70,7 +76,7 @@ export class MessageEventBus extends EventEmitter {
*
* Sets `isInitialized` to `true` once finished.
*/
async initialize() {
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> {
if (this.isInitialized) {
return;
}
@ -93,64 +99,75 @@ export class MessageEventBus extends EventEmitter {
}
LoggerProxy.debug('Initializing event writer');
this.logWriter = await MessageEventBusLogWriter.getInstance();
if (options?.workerId) {
// only add 'worker' to log file name since the ID changes on every start and we
// would not be able to recover the log files from the previous run not knowing it
const logBaseName = config.getEnv('eventBus.logWriter.logBaseName') + '-worker';
this.logWriter = await MessageEventBusLogWriter.getInstance({
logBaseName,
});
} else {
this.logWriter = await MessageEventBusLogWriter.getInstance();
}
if (!this.logWriter) {
LoggerProxy.warn('Could not initialize event writer');
}
// unsent event check:
// - find unsent messages in current event log(s)
// - cycle event logs and start the logging to a fresh file
// - retry sending events
LoggerProxy.debug('Checking for unsent event messages');
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug(
`Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `,
);
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);
if (options?.skipRecoveryPass) {
LoggerProxy.debug('Skipping unsent event check');
} else {
// unsent event check:
// - find unsent messages in current event log(s)
// - cycle event logs and start the logging to a fresh file
// - retry sending events
LoggerProxy.debug('Checking for unsent event messages');
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug(
`Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `,
);
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
if (unfinishedExecutionIds.length > 0) {
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await Container.get(WorkflowRepository).find({
where: { active: true },
select: ['id', 'name'],
});
if (activeWorkflows.length > 0) {
LoggerProxy.info('Currently active workflows:');
for (const workflowData of activeWorkflows) {
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
if (unfinishedExecutionIds.length > 0) {
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await Container.get(WorkflowRepository).find({
where: { active: true },
select: ['id', 'name'],
});
if (activeWorkflows.length > 0) {
LoggerProxy.info('Currently active workflows:');
for (const workflowData of activeWorkflows) {
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
}
}
}
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') {
await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds);
// if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
if (recoveryAlreadyAttempted)
LoggerProxy.warn('Skipped recovery process since it previously failed.');
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') {
await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds);
// if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
if (recoveryAlreadyAttempted)
LoggerProxy.warn('Skipped recovery process since it previously failed.');
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
}
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}
// if configured, run this test every n ms
if (config.getEnv('eventBus.checkUnsentInterval') > 0) {
if (this.pushIntervalTimer) {
@ -192,6 +209,12 @@ export class MessageEventBus extends EventEmitter {
return result;
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await Container.get(OrchestrationService).restartEventBus();
}
}
private async trySendingUnsent(msgs?: EventMessageTypes[]) {
const unsentMessages = msgs ?? (await this.getEventsUnsent());
if (unsentMessages.length > 0) {
@ -212,9 +235,15 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');
}
async restart() {
await this.close();
await this.initialize({ skipRecoveryPass: true });
}
async send(msgs: EventMessageTypes | EventMessageTypes[]) {
if (!Array.isArray(msgs)) {
msgs = [msgs];

View file

@ -535,3 +535,12 @@ export declare namespace ExternalSecretsRequest {
type UpdateProvider = AuthenticatedRequest<{ provider: string }>;
}
// ----------------------------------
// /orchestration
// ----------------------------------
//
export declare namespace OrchestrationRequest {
type GetAll = AuthenticatedRequest;
type Get = AuthenticatedRequest<{ id: string }, {}, {}, {}>;
}

View file

@ -0,0 +1,172 @@
import { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import { eventBus } from '../eventbus';
import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from './redis/RedisServiceCommands';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './redis/RedisServiceHelper';
@Service()
export class OrchestrationService {
private initialized = false;
private _uniqueInstanceId = '';
get uniqueInstanceId(): string {
return this._uniqueInstanceId;
}
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber;
constructor(readonly redisService: RedisService) {}
async init(uniqueInstanceId: string) {
this._uniqueInstanceId = uniqueInstanceId;
await this.initPublisher();
await this.initSubscriber();
this.initialized = true;
}
async shutdown() {
await this.redisPublisher?.destroy();
await this.redisSubscriber?.destroy();
}
private async initPublisher() {
this.redisPublisher = await this.redisService.getPubSubPublisher();
}
private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
// TODO: these are all proof of concept implementations for the moment
// until worker communication is implemented
// #region proof of concept
await this.redisSubscriber.subscribeToEventLog();
await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
// TODO: this is a proof of concept implementation to forward events to the main instance's event bus
// Events are arriving through a pub/sub channel and are forwarded to the eventBus
// In the future, a stream should probably replace this implementation entirely
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleEventBusMessage(messageString);
} else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await this.handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await this.handleCommandMessage(messageString);
}
},
);
}
async handleWorkerResponseMessage(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse);
}
return workerResponse;
}
async handleEventBusMessage(messageString: string) {
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await eventBus.send(eventMessage);
}
}
return eventData;
}
async handleCommandMessage(messageString: string) {
if (!messageString) return;
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
LoggerProxy.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
if (
message.senderId === this.uniqueInstanceId ||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'restartEventBus':
await eventBus.restart();
break;
}
return message;
}
return;
}
async getWorkerStatus(id?: string) {
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'getStatus',
targets: id ? [id] : undefined,
});
}
async getWorkerIds() {
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'getId',
});
}
// TODO: not implemented yet on worker side
async stopWorker(id?: string) {
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'stopWorker',
targets: id ? [id] : undefined,
});
}
async restartEventBus(id?: string) {
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'restartEventBus',
targets: id ? [id] : undefined,
});
}
}

View file

@ -1,12 +1,13 @@
export type RedisServiceCommand = 'getStatus' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands
export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands
/**
* An object to be sent via Redis pub/sub from the main process to the workers.
* @field command: The command to be executed.
* @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids.
* @field args: Optional arguments to be passed to the command.
* @field payload: Optional arguments to be sent with the command.
*/
type RedisServiceBaseCommand = {
senderId: string;
command: RedisServiceCommand;
payload?: {
[key: string]: string | number | boolean | string[] | number[] | boolean[];
@ -15,7 +16,38 @@ type RedisServiceBaseCommand = {
export type RedisServiceWorkerResponseObject = {
workerId: string;
} & RedisServiceBaseCommand;
} & (
| RedisServiceBaseCommand
| {
command: 'getStatus';
payload: {
workerId: string;
runningJobs: string[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string[];
arch: string;
platform: NodeJS.Platform;
hostname: string;
net: string[];
};
}
| {
command: 'getId';
}
| {
command: 'restartEventBus';
payload: {
result: 'success' | 'error';
error?: string;
};
}
| {
command: 'stopWorker';
}
);
export type RedisServiceCommandObject = {
targets?: string[];

View file

@ -23,11 +23,11 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
if (!this.redisClient) {
await this.init();
}
await this.redisClient?.subscribe(channel, (error, count: number) => {
await this.redisClient?.subscribe(channel, (error, _count: number) => {
if (error) {
Logger.error(`Error subscribing to channel ${channel}`);
} else {
Logger.debug(`Subscribed ${count.toString()} to eventlog channel`);
Logger.debug(`Subscribed Redis PubSub client to channel: ${channel}`);
}
});
}

View file

@ -0,0 +1,81 @@
import { mockInstance } from '../shared/utils/';
import { Worker } from '@/commands/worker';
import * as Config from '@oclif/config';
import { LoggerProxy } from 'n8n-workflow';
import { Telemetry } from '@/telemetry';
import { getLogger } from '@/Logger';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { BinaryDataManager } from 'n8n-core';
import { CacheService } from '@/services/cache.service';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { CredentialTypes } from '@/CredentialTypes';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog';
import { RedisService } from '@/services/redis.service';
const config: Config.IConfig = new Config.Config({ root: __dirname });
beforeAll(async () => {
LoggerProxy.init(getLogger());
mockInstance(Telemetry);
mockInstance(PostHogClient);
mockInstance(InternalHooks);
mockInstance(CacheService);
mockInstance(ExternalSecretsManager);
mockInstance(BinaryDataManager);
mockInstance(MessageEventBus);
mockInstance(LoadNodesAndCredentials);
mockInstance(CredentialTypes);
mockInstance(NodeTypes);
mockInstance(RedisService);
mockInstance(RedisServicePubSubPublisher);
mockInstance(RedisServicePubSubSubscriber);
});
test('worker initializes all its components', async () => {
const worker = new Worker([], config);
jest.spyOn(worker, 'init');
jest.spyOn(worker, 'initLicense').mockImplementation(async () => {});
jest.spyOn(worker, 'initBinaryManager').mockImplementation(async () => {});
jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {});
jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {});
jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {});
jest.spyOn(worker, 'initRedis');
jest.spyOn(RedisServicePubSubPublisher.prototype, 'init').mockImplementation(async () => {});
jest
.spyOn(RedisServicePubSubPublisher.prototype, 'publishToEventLog')
.mockImplementation(async () => {});
jest
.spyOn(RedisServicePubSubSubscriber.prototype, 'subscribeToCommandChannel')
.mockImplementation(async () => {});
jest
.spyOn(RedisServicePubSubSubscriber.prototype, 'addMessageHandler')
.mockImplementation(async () => {});
jest.spyOn(worker, 'initQueue').mockImplementation(async () => {});
await worker.init();
expect(worker.uniqueInstanceId).toBeDefined();
expect(worker.uniqueInstanceId).toContain('worker');
expect(worker.uniqueInstanceId.length).toBeGreaterThan(15);
expect(worker.initLicense).toHaveBeenCalled();
expect(worker.initBinaryManager).toHaveBeenCalled();
expect(worker.initExternalHooks).toHaveBeenCalled();
expect(worker.initExternalSecrets).toHaveBeenCalled();
expect(worker.initEventBus).toHaveBeenCalled();
expect(worker.initRedis).toHaveBeenCalled();
expect(worker.redisPublisher).toBeDefined();
expect(worker.redisPublisher.init).toHaveBeenCalled();
expect(worker.redisPublisher.publishToEventLog).toHaveBeenCalled();
expect(worker.redisSubscriber).toBeDefined();
expect(worker.redisSubscriber.subscribeToCommandChannel).toHaveBeenCalled();
expect(worker.redisSubscriber.addMessageHandler).toHaveBeenCalled();
expect(worker.initQueue).toHaveBeenCalled();
jest.restoreAllMocks();
});

View file

@ -0,0 +1,140 @@
import Container from 'typedi';
import config from '@/config';
import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import { OrchestrationService } from '@/services/orchestration.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
import { eventBus } from '@/eventbus';
import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers';
import { RedisService } from '@/services/redis.service';
import { mockInstance } from '../../integration/shared/utils';
const os = Container.get(OrchestrationService);
function setDefaultConfig() {
config.set('executions.mode', 'queue');
}
const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
senderId: 'test',
workerId: 'test',
command: 'restartEventBus',
payload: {
result: 'success',
},
};
const eventBusMessage = new EventMessageWorkflow({
eventName: 'n8n.workflow.success',
id: 'test',
message: 'test',
payload: {
test: 'test',
},
});
describe('Orchestration Service', () => {
beforeAll(async () => {
mockInstance(RedisService);
LoggerProxy.init(getLogger());
jest.mock('ioredis', () => {
const Redis = require('ioredis-mock');
if (typeof Redis === 'object') {
// the first mock is an ioredis shim because ioredis-mock depends on it
// https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111
return {
Command: { _transformer: { argument: {}, reply: {} } },
};
}
// second mock for our code
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return function (...args: any) {
return new Redis(args);
};
});
jest.mock('../../../src/services/redis/RedisServicePubSubPublisher', () => {
return jest.fn().mockImplementation(() => {
return {
init: jest.fn(),
publishToEventLog: jest.fn(),
publishToWorkerChannel: jest.fn(),
destroy: jest.fn(),
};
});
});
jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber', () => {
return jest.fn().mockImplementation(() => {
return {
subscribeToCommandChannel: jest.fn(),
destroy: jest.fn(),
};
});
});
setDefaultConfig();
});
afterAll(async () => {
jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks();
jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks();
});
test('should initialize', async () => {
await os.init('test-orchestration-service');
expect(os.redisPublisher).toBeDefined();
expect(os.redisSubscriber).toBeDefined();
expect(os.uniqueInstanceId).toBeDefined();
});
test('should handle worker responses', async () => {
const response = await os.handleWorkerResponseMessage(
JSON.stringify(workerRestartEventbusResponse),
);
expect(response.command).toEqual('restartEventBus');
});
test('should handle event messages', async () => {
const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage));
jest.spyOn(eventBus, 'send');
jest.spyOn(EventHelpers, 'getEventMessageObjectByType');
expect(eventBus.send).toHaveBeenCalled();
expect(response.eventName).toEqual('n8n.workflow.success');
jest.spyOn(eventBus, 'send').mockRestore();
jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore();
});
test('should handle command messages from others', async () => {
jest.spyOn(eventBus, 'restart');
const responseFalseId = await os.handleCommandMessage(
JSON.stringify(workerRestartEventbusResponse),
);
expect(responseFalseId).toBeDefined();
expect(responseFalseId!.command).toEqual('restartEventBus');
expect(responseFalseId!.senderId).toEqual('test');
expect(eventBus.restart).toHaveBeenCalled();
jest.spyOn(eventBus, 'restart').mockRestore();
});
test('should reject command messages from iteslf', async () => {
jest.spyOn(eventBus, 'restart');
const response = await os.handleCommandMessage(
JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }),
);
expect(response).toBeDefined();
expect(response!.command).toEqual('restartEventBus');
expect(response!.senderId).toEqual(os.uniqueInstanceId);
expect(eventBus.restart).not.toHaveBeenCalled();
jest.spyOn(eventBus, 'restart').mockRestore();
});
test('should send command messages', async () => {
jest.spyOn(os.redisPublisher, 'publishToCommandChannel');
await os.getWorkerIds();
expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled();
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore();
});
afterAll(async () => {
await os.shutdown();
});
});