refactor(core): Have one orchestration service per instance type (#7303)

webhook instances will not listen to either worker or event log messages
on the Redis pub/sub channel
This commit is contained in:
Michael Auerswald 2023-10-06 13:58:11 +02:00 committed by GitHub
parent 193181a9c6
commit afa683a06f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 380 additions and 215 deletions

View file

@ -19,8 +19,6 @@ import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks';
import { webhookRequestHandler } from '@/WebhookHelpers'; import { webhookRequestHandler } from '@/WebhookHelpers';
import { generateHostInstanceId } from './databases/utils/generators'; import { generateHostInstanceId } from './databases/utils/generators';
import { OrchestrationService } from './services/orchestration.service';
import { OrchestrationHandlerService } from './services/orchestration.handler.service';
export abstract class AbstractServer { export abstract class AbstractServer {
protected server: Server; protected server: Server;
@ -115,12 +113,6 @@ export abstract class AbstractServer {
else res.send('n8n is starting up. Please wait'); else res.send('n8n is starting up. Please wait');
} else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); } else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
}); });
if (config.getEnv('executions.mode') === 'queue') {
// will start the redis connections
await Container.get(OrchestrationService).init();
await Container.get(OrchestrationHandlerService).init();
}
} }
async init(): Promise<void> { async init(): Promise<void> {

View file

@ -20,7 +20,7 @@ import {
import { License } from '@/License'; import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
const logger = getLogger(); const logger = getLogger();
@ -83,7 +83,7 @@ export class ExternalSecretsManager {
} }
async broadcastReloadExternalSecretsProviders() { async broadcastReloadExternalSecretsProviders() {
await Container.get(OrchestrationService).broadcastReloadExternalSecretsProviders(); await Container.get(OrchestrationMainService).broadcastReloadExternalSecretsProviders();
} }
private async getEncryptionKey(): Promise<string> { private async getEncryptionKey(): Promise<string> {

View file

@ -31,6 +31,8 @@ import { InternalHooks } from '@/InternalHooks';
import { License } from '@/License'; import { License } from '@/License';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { IConfig } from '@oclif/config'; import { IConfig } from '@oclif/config';
import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open'); const open = require('open');
@ -214,6 +216,8 @@ export class Start extends BaseCommand {
await this.initLicense(); await this.initLicense();
this.logger.debug('License init complete'); this.logger.debug('License init complete');
await this.initOrchestration();
this.logger.debug('Orchestration init complete');
await this.initBinaryDataService(); await this.initBinaryDataService();
this.logger.debug('Binary data service init complete'); this.logger.debug('Binary data service init complete');
await this.initExternalHooks(); await this.initExternalHooks();
@ -228,6 +232,13 @@ export class Start extends BaseCommand {
} }
} }
async initOrchestration() {
if (config.get('executions.mode') === 'queue') {
await Container.get(OrchestrationMainService).init();
await Container.get(OrchestrationHandlerMainService).init();
}
}
async run() { async run() {
// eslint-disable-next-line @typescript-eslint/no-shadow // eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Start); const { flags } = this.parse(Start);

View file

@ -7,6 +7,8 @@ import { Queue } from '@/Queue';
import { BaseCommand } from './BaseCommand'; import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { IConfig } from '@oclif/config'; import { IConfig } from '@oclif/config';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
import { OrchestrationHandlerWebhookService } from '@/services/orchestration/webhook/orchestration.handler.webhook.service';
export class Webhook extends BaseCommand { export class Webhook extends BaseCommand {
static description = 'Starts n8n webhook process. Intercepts only production URLs.'; static description = 'Starts n8n webhook process. Intercepts only production URLs.';
@ -94,6 +96,8 @@ export class Webhook extends BaseCommand {
await this.initLicense(); await this.initLicense();
this.logger.debug('License init complete'); this.logger.debug('License init complete');
await this.initOrchestration();
this.logger.debug('Orchestration init complete');
await this.initBinaryDataService(); await this.initBinaryDataService();
this.logger.debug('Binary data service init complete'); this.logger.debug('Binary data service init complete');
await this.initExternalHooks(); await this.initExternalHooks();
@ -115,4 +119,9 @@ export class Webhook extends BaseCommand {
async catch(error: Error) { async catch(error: Error) {
await this.exitWithCrash('Exiting due to an error.', error); await this.exitWithCrash('Exiting due to an error.', error);
} }
async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
await Container.get(OrchestrationHandlerWebhookService).init();
}
} }

View file

@ -32,12 +32,12 @@ import { OwnershipService } from '@/services/ownership.service';
import type { ICredentialsOverwrite } from '@/Interfaces'; import type { ICredentialsOverwrite } from '@/Interfaces';
import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { rawBodyReader, bodyParser } from '@/middlewares'; import { rawBodyReader, bodyParser } from '@/middlewares';
import { eventBus } from '../eventbus'; import { eventBus } from '@/eventbus';
import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric';
import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric';
import { getWorkerCommandReceivedHandler } from '../worker/workerCommandHandler';
import { IConfig } from '@oclif/config'; import { IConfig } from '@oclif/config';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
export class Worker extends BaseCommand { export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker'; static description = '\nStarts a n8n worker';
@ -58,8 +58,6 @@ export class Worker extends BaseCommand {
static jobQueue: JobQueue; static jobQueue: JobQueue;
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber; redisSubscriber: RedisServicePubSubSubscriber;
/** /**
@ -272,10 +270,20 @@ export class Worker extends BaseCommand {
this.logger.debug('External secrets init complete'); this.logger.debug('External secrets init complete');
await this.initEventBus(); await this.initEventBus();
this.logger.debug('Event bus init complete'); this.logger.debug('Event bus init complete');
await this.initRedis();
this.logger.debug('Redis init complete');
await this.initQueue(); await this.initQueue();
this.logger.debug('Queue init complete'); this.logger.debug('Queue init complete');
await this.initOrchestration();
this.logger.debug('Orchestration init complete');
await this.initQueue();
await Container.get(OrchestrationWorkerService).publishToEventLog(
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.queueModeId,
},
}),
);
} }
async initEventBus() { async initEventBus() {
@ -290,29 +298,14 @@ export class Worker extends BaseCommand {
* A subscription connection to redis is created to subscribe to commands from the main process * A subscription connection to redis is created to subscribe to commands from the main process
* The subscription connection adds a handler to handle the command messages * The subscription connection adds a handler to handle the command messages
*/ */
async initRedis() { async initOrchestration() {
this.redisPublisher = Container.get(RedisServicePubSubPublisher); await Container.get(OrchestrationWorkerService).init();
this.redisSubscriber = Container.get(RedisServicePubSubSubscriber); await Container.get(OrchestrationHandlerWorkerService).initWithOptions({
await this.redisPublisher.init();
await this.redisPublisher.publishToEventLog(
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.queueModeId,
},
}),
);
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'WorkerCommandReceivedHandler',
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({
queueModeId: this.queueModeId, queueModeId: this.queueModeId,
instanceId: this.instanceId, instanceId: this.instanceId,
redisPublisher: this.redisPublisher, redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs), getRunningJobIds: () => Object.keys(Worker.runningJobs),
}), });
);
} }
async initQueue() { async initQueue() {

View file

@ -1,13 +1,13 @@
import { Authorized, Get, RestController } from '@/decorators'; import { Authorized, Get, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests'; import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
@Authorized(['global', 'owner']) @Authorized(['global', 'owner'])
@RestController('/orchestration') @RestController('/orchestration')
@Service() @Service()
export class OrchestrationController { export class OrchestrationController {
constructor(private readonly orchestrationService: OrchestrationService) {} constructor(private readonly orchestrationService: OrchestrationMainService) {}
/** /**
* These endpoint currently do not return anything, they just trigger the messsage to * These endpoint currently do not return anything, they just trigger the messsage to

View file

@ -31,7 +31,7 @@ import Container, { Service } from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { OrchestrationService } from '../../services/orchestration.service'; import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -190,7 +190,9 @@ export class MessageEventBus extends EventEmitter {
this.destinations[destination.getId()] = destination; this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening(); this.destinations[destination.getId()].startListening();
if (notifyWorkers) { if (notifyWorkers) {
await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); await Container.get(
OrchestrationMainService,
).broadcastRestartEventbusAfterDestinationUpdate();
} }
return destination; return destination;
} }
@ -216,7 +218,9 @@ export class MessageEventBus extends EventEmitter {
delete this.destinations[id]; delete this.destinations[id];
} }
if (notifyWorkers) { if (notifyWorkers) {
await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); await Container.get(
OrchestrationMainService,
).broadcastRestartEventbusAfterDestinationUpdate();
} }
return result; return result;
} }

View file

@ -0,0 +1,50 @@
import Container from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import config from '@/config';
export abstract class OrchestrationService {
protected initialized = false;
redisPublisher: RedisServicePubSubPublisher;
readonly redisService: RedisService;
get isQueueMode(): boolean {
return config.get('executions.mode') === 'queue';
}
get isMainInstance(): boolean {
return config.get('generic.instanceType') === 'main';
}
get isWebhookInstance(): boolean {
return config.get('generic.instanceType') === 'webhook';
}
get isWorkerInstance(): boolean {
return config.get('generic.instanceType') === 'worker';
}
constructor() {
this.redisService = Container.get(RedisService);
}
sanityCheck(): boolean {
return this.initialized && this.isQueueMode;
}
async init() {
await this.initPublisher();
this.initialized = true;
}
async shutdown() {
await this.redisPublisher?.destroy();
this.initialized = false;
}
private async initPublisher() {
this.redisPublisher = await this.redisService.getPubSubPublisher();
}
}

View file

@ -0,0 +1,33 @@
import Container from 'typedi';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
export abstract class OrchestrationHandlerService {
protected initialized = false;
redisSubscriber: RedisServicePubSubSubscriber;
readonly redisService: RedisService;
constructor() {
this.redisService = Container.get(RedisService);
}
async init() {
await this.initSubscriber();
this.initialized = true;
}
async initWithOptions(options: WorkerCommandReceivedHandlerOptions) {
await this.initSubscriber(options);
this.initialized = true;
}
async shutdown() {
await this.redisSubscriber?.destroy();
this.initialized = false;
}
protected abstract initSubscriber(options?: WorkerCommandReceivedHandlerOptions): Promise<void>;
}

View file

@ -1,47 +0,0 @@
import Container, { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './redis/RedisServiceHelper';
import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from './orchestration/handleCommandMessage';
import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus';
@Service()
export class OrchestrationHandlerService {
redisSubscriber: RedisServicePubSubSubscriber;
constructor(readonly redisService: RedisService) {}
async init() {
await this.initSubscriber();
}
async shutdown() {
await this.redisSubscriber?.destroy();
}
private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel();
await this.redisSubscriber.subscribeToEventLog();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessage(messageString);
} else if (channel === EVENT_BUS_REDIS_CHANNEL) {
await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString);
}
},
);
}
}

View file

@ -1,79 +0,0 @@
import { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import config from '@/config';
@Service()
export class OrchestrationService {
private initialized = false;
redisPublisher: RedisServicePubSubPublisher;
get isQueueMode() {
return config.getEnv('executions.mode') === 'queue';
}
constructor(readonly redisService: RedisService) {}
async init() {
await this.initPublisher();
this.initialized = true;
}
async shutdown() {
await this.redisPublisher?.destroy();
}
private async initPublisher() {
this.redisPublisher = await this.redisService.getPubSubPublisher();
}
async getWorkerStatus(id?: string) {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'getStatus',
targets: id ? [id] : undefined,
});
}
async getWorkerIds() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'getId',
});
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'restartEventBus',
});
}
async broadcastReloadExternalSecretsProviders() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadExternalSecretsProviders',
});
}
}

View file

@ -2,6 +2,10 @@ import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
export interface RedisServiceCommandLastReceived {
[date: string]: Date;
}
export function messageToRedisServiceCommandObject(messageString: string) { export function messageToRedisServiceCommandObject(messageString: string) {
if (!messageString) return; if (!messageString) return;
let message: RedisServiceCommandObject; let message: RedisServiceCommandObject;
@ -15,3 +19,15 @@ export function messageToRedisServiceCommandObject(messageString: string) {
} }
return message; return message;
} }
const lastReceived: RedisServiceCommandLastReceived = {};
export function debounceMessageReceiver(message: RedisServiceCommandObject, timeout: number = 100) {
const now = new Date();
const lastReceivedDate = lastReceived[message.command];
if (lastReceivedDate && now.getTime() - lastReceivedDate.getTime() < timeout) {
return false;
}
lastReceived[message.command] = now;
return true;
}

View file

@ -1,17 +1,14 @@
import { LoggerProxy } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow';
import { messageToRedisServiceCommandObject } from './helpers'; import { debounceMessageReceiver, messageToRedisServiceCommandObject } from '../helpers';
import config from '@/config'; import config from '@/config';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import Container from 'typedi'; import Container from 'typedi';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import type { N8nInstanceType } from '@/Interfaces';
import { License } from '@/License'; import { License } from '@/License';
// this function handles commands sent to the MAIN instance. the workers handle their own commands export async function handleCommandMessageMain(messageString: string) {
export async function handleCommandMessage(messageString: string) {
const queueModeId = config.get('redis.queueModeId'); const queueModeId = config.get('redis.queueModeId');
const instanceType = config.get('generic.instanceType') as N8nInstanceType; const isMainInstance = config.get('generic.instanceType') === 'main';
const isMainInstance = instanceType === 'main';
const message = messageToRedisServiceCommandObject(messageString); const message = messageToRedisServiceCommandObject(messageString);
if (message) { if (message) {
@ -30,6 +27,12 @@ export async function handleCommandMessage(messageString: string) {
} }
switch (message.command) { switch (message.command) {
case 'reloadLicense': case 'reloadLicense':
if (!debounceMessageReceiver(message, 500)) {
message.payload = {
result: 'debounced',
};
return message;
}
if (isMainInstance) { if (isMainInstance) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
LoggerProxy.error( LoggerProxy.error(
@ -40,8 +43,20 @@ export async function handleCommandMessage(messageString: string) {
await Container.get(License).reload(); await Container.get(License).reload();
break; break;
case 'restartEventBus': case 'restartEventBus':
if (!debounceMessageReceiver(message, 200)) {
message.payload = {
result: 'debounced',
};
return message;
}
await Container.get(MessageEventBus).restart(); await Container.get(MessageEventBus).restart();
case 'reloadExternalSecretsProviders': case 'reloadExternalSecretsProviders':
if (!debounceMessageReceiver(message, 200)) {
message.payload = {
result: 'debounced',
};
return message;
}
await Container.get(ExternalSecretsManager).reloadAllProviders(); await Container.get(ExternalSecretsManager).reloadAllProviders();
default: default:
break; break;

View file

@ -1,7 +1,7 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow'; import { jsonParse, LoggerProxy } from 'n8n-workflow';
import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands'; import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands';
export async function handleWorkerResponseMessage(messageString: string) { export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString); const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) { if (workerResponse) {
// TODO: Handle worker response // TODO: Handle worker response

View file

@ -0,0 +1,34 @@
import Container, { Service } from 'typedi';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from '../../redis/RedisServiceHelper';
import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain';
import { handleCommandMessageMain } from './handleCommandMessageMain';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
@Service()
export class OrchestrationHandlerMainService extends OrchestrationHandlerService {
async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToCommandChannel();
await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToEventLog();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessageMain(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageMain(messageString);
} else if (channel === EVENT_BUS_REDIS_CHANNEL) {
await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString);
}
},
);
}
}

View file

@ -0,0 +1,38 @@
import { Service } from 'typedi';
import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationMainService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isMainInstance;
}
async getWorkerStatus(id?: string) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'getStatus',
targets: id ? [id] : undefined,
});
}
async getWorkerIds() {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'getId',
});
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'restartEventBus',
});
}
async broadcastReloadExternalSecretsProviders() {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'reloadExternalSecretsProviders',
});
}
}

View file

@ -0,0 +1,6 @@
import { handleCommandMessageMain } from '../main/handleCommandMessageMain';
export async function handleCommandMessageWebhook(messageString: string) {
// currently webhooks handle commands the same way as the main instance
return handleCommandMessageMain(messageString);
}

View file

@ -0,0 +1,22 @@
import { Service } from 'typedi';
import { COMMAND_REDIS_CHANNEL } from '../../redis/RedisServiceHelper';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import { handleCommandMessageWebhook } from './handleCommandMessageWebhook';
@Service()
export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService {
async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageWebhook(messageString);
}
},
);
}
}

View file

@ -0,0 +1,9 @@
import { Service } from 'typedi';
import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationWebhookService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWebhookInstance;
}
}

View file

@ -5,15 +5,18 @@ import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServiceP
import * as os from 'os'; import * as os from 'os';
import Container from 'typedi'; import Container from 'typedi';
import { License } from '@/License'; import { License } from '@/License';
import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '../ExternalSecrets/ExternalSecretsManager.ee'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { debounceMessageReceiver } from '../helpers';
export function getWorkerCommandReceivedHandler(options: { export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string; queueModeId: string;
instanceId: string; instanceId: string;
redisPublisher: RedisServicePubSubPublisher; redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[]; getRunningJobIds: () => string[];
}) { }
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
return async (channel: string, messageString: string) => { return async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) { if (channel === COMMAND_REDIS_CHANNEL) {
if (!messageString) return; if (!messageString) return;
@ -35,6 +38,7 @@ export function getWorkerCommandReceivedHandler(options: {
} }
switch (message.command) { switch (message.command) {
case 'getStatus': case 'getStatus':
if (!debounceMessageReceiver(message, 200)) return;
await options.redisPublisher.publishToWorkerChannel({ await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId, workerId: options.queueModeId,
command: message.command, command: message.command,
@ -57,12 +61,14 @@ export function getWorkerCommandReceivedHandler(options: {
}); });
break; break;
case 'getId': case 'getId':
if (!debounceMessageReceiver(message, 200)) return;
await options.redisPublisher.publishToWorkerChannel({ await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId, workerId: options.queueModeId,
command: message.command, command: message.command,
}); });
break; break;
case 'restartEventBus': case 'restartEventBus':
if (!debounceMessageReceiver(message, 100)) return;
try { try {
await Container.get(MessageEventBus).restart(); await Container.get(MessageEventBus).restart();
await options.redisPublisher.publishToWorkerChannel({ await options.redisPublisher.publishToWorkerChannel({
@ -84,6 +90,7 @@ export function getWorkerCommandReceivedHandler(options: {
} }
break; break;
case 'reloadExternalSecretsProviders': case 'reloadExternalSecretsProviders':
if (!debounceMessageReceiver(message, 200)) return;
try { try {
await Container.get(ExternalSecretsManager).reloadAllProviders(); await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.redisPublisher.publishToWorkerChannel({ await options.redisPublisher.publishToWorkerChannel({
@ -105,9 +112,11 @@ export function getWorkerCommandReceivedHandler(options: {
} }
break; break;
case 'reloadLicense': case 'reloadLicense':
if (!debounceMessageReceiver(message, 500)) return;
await Container.get(License).reload(); await Container.get(License).reload();
break; break;
case 'stopWorker': case 'stopWorker':
if (!debounceMessageReceiver(message, 500)) return;
// TODO: implement proper shutdown // TODO: implement proper shutdown
// await this.stopProcess(); // await this.stopProcess();
break; break;

View file

@ -0,0 +1,17 @@
import { Service } from 'typedi';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
@Service()
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
async initSubscriber(options: WorkerCommandReceivedHandlerOptions) {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'WorkerCommandReceivedHandler',
getWorkerCommandReceivedHandler(options),
);
}
}

View file

@ -0,0 +1,15 @@
import { Service } from 'typedi';
import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage';
import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationWorkerService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWorkerInstance;
}
async publishToEventLog(message: AbstractEventMessage) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToEventLog(message);
}
}

View file

@ -17,6 +17,8 @@ import { NodeTypes } from '@/NodeTypes';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog'; import { PostHogClient } from '@/posthog';
import { RedisService } from '@/services/redis.service'; import { RedisService } from '@/services/redis.service';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname }); const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
@ -48,17 +50,14 @@ test('worker initializes all its components', async () => {
jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {}); jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {});
jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {}); jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {});
jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {}); jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {});
jest.spyOn(worker, 'initRedis'); jest.spyOn(worker, 'initOrchestration');
jest
.spyOn(OrchestrationWorkerService.prototype, 'publishToEventLog')
.mockImplementation(async () => {});
jest
.spyOn(OrchestrationHandlerWorkerService.prototype, 'initSubscriber')
.mockImplementation(async () => {});
jest.spyOn(RedisServicePubSubPublisher.prototype, 'init').mockImplementation(async () => {}); jest.spyOn(RedisServicePubSubPublisher.prototype, 'init').mockImplementation(async () => {});
jest
.spyOn(RedisServicePubSubPublisher.prototype, 'publishToEventLog')
.mockImplementation(async () => {});
jest
.spyOn(RedisServicePubSubSubscriber.prototype, 'subscribeToCommandChannel')
.mockImplementation(async () => {});
jest
.spyOn(RedisServicePubSubSubscriber.prototype, 'addMessageHandler')
.mockImplementation(async () => {});
jest.spyOn(worker, 'initQueue').mockImplementation(async () => {}); jest.spyOn(worker, 'initQueue').mockImplementation(async () => {});
await worker.init(); await worker.init();
@ -71,13 +70,9 @@ test('worker initializes all its components', async () => {
expect(worker.initExternalHooks).toHaveBeenCalled(); expect(worker.initExternalHooks).toHaveBeenCalled();
expect(worker.initExternalSecrets).toHaveBeenCalled(); expect(worker.initExternalSecrets).toHaveBeenCalled();
expect(worker.initEventBus).toHaveBeenCalled(); expect(worker.initEventBus).toHaveBeenCalled();
expect(worker.initRedis).toHaveBeenCalled(); expect(worker.initOrchestration).toHaveBeenCalled();
expect(worker.redisPublisher).toBeDefined(); expect(OrchestrationHandlerWorkerService.prototype.initSubscriber).toHaveBeenCalled();
expect(worker.redisPublisher.init).toHaveBeenCalled(); expect(OrchestrationWorkerService.prototype.publishToEventLog).toHaveBeenCalled();
expect(worker.redisPublisher.publishToEventLog).toHaveBeenCalled();
expect(worker.redisSubscriber).toBeDefined();
expect(worker.redisSubscriber.subscribeToCommandChannel).toHaveBeenCalled();
expect(worker.redisSubscriber.addMessageHandler).toHaveBeenCalled();
expect(worker.initQueue).toHaveBeenCalled(); expect(worker.initQueue).toHaveBeenCalled();
jest.restoreAllMocks(); jest.restoreAllMocks();

View file

@ -91,9 +91,7 @@ beforeAll(async () => {
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
config.set('eventBus.logWriter.keepLogCount', 1); config.set('eventBus.logWriter.keepLogCount', 1);
await eventBus.initialize({ await eventBus.initialize({});
uniqueInstanceId: 'test',
});
}); });
afterAll(async () => { afterAll(async () => {

View file

@ -2,22 +2,25 @@ import Container from 'typedi';
import config from '@/config'; import config from '@/config';
import { LoggerProxy } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger'; import { getLogger } from '@/Logger';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
import { eventBus } from '@/eventbus'; import { eventBus } from '@/eventbus';
import { RedisService } from '@/services/redis.service'; import { RedisService } from '@/services/redis.service';
import { mockInstance } from '../../integration/shared/utils'; import { mockInstance } from '../../integration/shared/utils';
import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handleWorkerResponseMessageMain';
import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; import { handleCommandMessageMain } from '@/services/orchestration/main/handleCommandMessageMain';
import { OrchestrationHandlerService } from '../../../src/services/orchestration.handler.service'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import * as helpers from '@/services/orchestration/helpers';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
const os = Container.get(OrchestrationService); const os = Container.get(OrchestrationMainService);
const handler = Container.get(OrchestrationHandlerService); const handler = Container.get(OrchestrationHandlerMainService);
let queueModeId: string; let queueModeId: string;
function setDefaultConfig() { function setDefaultConfig() {
config.set('executions.mode', 'queue'); config.set('executions.mode', 'queue');
config.set('generic.instanceType', 'main');
} }
const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
@ -32,6 +35,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
describe('Orchestration Service', () => { describe('Orchestration Service', () => {
beforeAll(async () => { beforeAll(async () => {
mockInstance(RedisService); mockInstance(RedisService);
mockInstance(ExternalSecretsManager);
LoggerProxy.init(getLogger()); LoggerProxy.init(getLogger());
jest.mock('ioredis', () => { jest.mock('ioredis', () => {
const Redis = require('ioredis-mock'); const Redis = require('ioredis-mock');
@ -85,7 +89,7 @@ describe('Orchestration Service', () => {
}); });
test('should handle worker responses', async () => { test('should handle worker responses', async () => {
const response = await handleWorkerResponseMessage( const response = await handleWorkerResponseMessageMain(
JSON.stringify(workerRestartEventbusResponse), JSON.stringify(workerRestartEventbusResponse),
); );
expect(response.command).toEqual('restartEventBus'); expect(response.command).toEqual('restartEventBus');
@ -93,7 +97,7 @@ describe('Orchestration Service', () => {
test('should handle command messages from others', async () => { test('should handle command messages from others', async () => {
jest.spyOn(LoggerProxy, 'error'); jest.spyOn(LoggerProxy, 'error');
const responseFalseId = await handleCommandMessage( const responseFalseId = await handleCommandMessageMain(
JSON.stringify({ JSON.stringify({
senderId: 'test', senderId: 'test',
command: 'reloadLicense', command: 'reloadLicense',
@ -108,7 +112,7 @@ describe('Orchestration Service', () => {
test('should reject command messages from iteslf', async () => { test('should reject command messages from iteslf', async () => {
jest.spyOn(eventBus, 'restart'); jest.spyOn(eventBus, 'restart');
const response = await handleCommandMessage( const response = await handleCommandMessageMain(
JSON.stringify({ ...workerRestartEventbusResponse, senderId: queueModeId }), JSON.stringify({ ...workerRestartEventbusResponse, senderId: queueModeId }),
); );
expect(response).toBeDefined(); expect(response).toBeDefined();
@ -119,9 +123,30 @@ describe('Orchestration Service', () => {
}); });
test('should send command messages', async () => { test('should send command messages', async () => {
jest.spyOn(os.redisPublisher, 'publishToCommandChannel'); setDefaultConfig();
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockImplementation(async () => {});
await os.getWorkerIds(); await os.getWorkerIds();
expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled();
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore();
}); });
test('should prevent receiving commands too often', async () => {
setDefaultConfig();
jest.spyOn(helpers, 'debounceMessageReceiver');
const res1 = await handleCommandMessageMain(
JSON.stringify({
senderId: 'test',
command: 'reloadExternalSecretsProviders',
}),
);
const res2 = await handleCommandMessageMain(
JSON.stringify({
senderId: 'test',
command: 'reloadExternalSecretsProviders',
}),
);
expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2);
expect(res1!.payload).toBeUndefined();
expect(res2!.payload!.result).toEqual('debounced');
});
}); });