diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 092b1a95a8..0170b8a9f9 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -20,6 +20,7 @@ import { WaitingWebhooks } from '@/WaitingWebhooks'; import { webhookRequestHandler } from '@/WebhookHelpers'; import { generateHostInstanceId } from './databases/utils/generators'; import { OrchestrationService } from './services/orchestration.service'; +import { OrchestrationHandlerService } from './services/orchestration.handler.service'; export abstract class AbstractServer { protected server: Server; @@ -118,6 +119,7 @@ export abstract class AbstractServer { if (config.getEnv('executions.mode') === 'queue') { // will start the redis connections await Container.get(OrchestrationService).init(); + await Container.get(OrchestrationHandlerService).init(); } } diff --git a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts index 6a0b422377..e8b0e00589 100644 --- a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts +++ b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts @@ -20,6 +20,7 @@ import { import { License } from '@/License'; import { InternalHooks } from '@/InternalHooks'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; const logger = getLogger(); @@ -70,6 +71,21 @@ export class ExternalSecretsManager { Object.values(this.initRetryTimeouts).forEach((v) => clearTimeout(v)); } + async reloadAllProviders(backoff?: number) { + logger.debug('Reloading all external secrets providers'); + const providers = this.getProviderNames(); + if (!providers) { + return; + } + for (const provider of providers) { + await this.reloadProvider(provider, backoff); + } + } + + async broadcastReloadExternalSecretsProviders() { + await Container.get(OrchestrationService).broadcastReloadExternalSecretsProviders(); + } + private async getEncryptionKey(): Promise { return UserSettings.getEncryptionKey(); } @@ -274,6 +290,7 @@ export class ExternalSecretsManager { await this.saveAndSetSettings(settings, this.settingsRepo); this.cachedSettings = settings; await this.reloadProvider(provider); + await this.broadcastReloadExternalSecretsProviders(); void this.trackProviderSave(provider, isNewProvider, userId); } @@ -293,6 +310,7 @@ export class ExternalSecretsManager { this.cachedSettings = settings; await this.reloadProvider(provider); await this.updateSecrets(); + await this.broadcastReloadExternalSecretsProviders(); } private async trackProviderSave(vaultType: string, isNew: boolean, userId?: string) { @@ -373,6 +391,7 @@ export class ExternalSecretsManager { } try { await this.providers[provider].update(); + await this.broadcastReloadExternalSecretsProviders(); return true; } catch { return false; diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 75446f511f..764ee0c31f 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -94,14 +94,12 @@ export abstract class BaseCommand extends Command { } protected setInstanceQueueModeId() { - if (config.getEnv('executions.mode') === 'queue') { - if (config.get('redis.queueModeId')) { - this.queueModeId = config.get('redis.queueModeId'); - return; - } - this.queueModeId = generateHostInstanceId(this.instanceType); - config.set('redis.queueModeId', this.queueModeId); + if (config.get('redis.queueModeId')) { + this.queueModeId = config.get('redis.queueModeId'); + return; } + this.queueModeId = generateHostInstanceId(this.instanceType); + config.set('redis.queueModeId', this.queueModeId); } protected async stopProcess() { diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 7d674379dc..05c0f7fee3 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -29,12 +29,9 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import Container, { Service } from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; -import { RedisService } from '@/services/redis.service'; -import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; -import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; -import { EVENT_BUS_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; +import { OrchestrationService } from '../../services/orchestration.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -54,10 +51,6 @@ export class MessageEventBus extends EventEmitter { isInitialized: boolean; - redisPublisher: RedisServicePubSubPublisher; - - redisSubscriber: RedisServicePubSubSubscriber; - logWriter: MessageEventBusLogWriter; destinations: { @@ -91,20 +84,6 @@ export class MessageEventBus extends EventEmitter { return; } - if (config.getEnv('executions.mode') === 'queue') { - this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); - this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber(); - await this.redisSubscriber.subscribeToEventLog(); - this.redisSubscriber.addMessageHandler( - 'MessageEventBusMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === EVENT_BUS_REDIS_CHANNEL) { - await this.handleRedisEventBusMessage(messageString); - } - }, - ); - } - LoggerProxy.debug('Initializing event bus...'); const savedEventDestinations = await Db.collections.EventDestinations.find({}); @@ -211,7 +190,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await this.broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); } return destination; } @@ -237,7 +216,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await this.broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); } return result; } @@ -253,14 +232,6 @@ export class MessageEventBus extends EventEmitter { return eventData; } - async broadcastRestartEventbusAfterDestinationUpdate() { - if (config.getEnv('executions.mode') === 'queue') { - await this.redisPublisher.publishToCommandChannel({ - command: 'restartEventBus', - }); - } - } - private async trySendingUnsent(msgs?: EventMessageTypes[]) { const unsentMessages = msgs ?? (await this.getEventsUnsent()); if (unsentMessages.length > 0) { @@ -281,7 +252,6 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } - await this.redisSubscriber?.unSubscribeFromEventLog(); this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } diff --git a/packages/cli/src/services/orchestration.handler.service.ts b/packages/cli/src/services/orchestration.handler.service.ts new file mode 100644 index 0000000000..aa64926068 --- /dev/null +++ b/packages/cli/src/services/orchestration.handler.service.ts @@ -0,0 +1,47 @@ +import Container, { Service } from 'typedi'; +import { RedisService } from './redis.service'; +import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './redis/RedisServiceHelper'; +import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; +import { handleCommandMessage } from './orchestration/handleCommandMessage'; +import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; + +@Service() +export class OrchestrationHandlerService { + redisSubscriber: RedisServicePubSubSubscriber; + + constructor(readonly redisService: RedisService) {} + + async init() { + await this.initSubscriber(); + } + + async shutdown() { + await this.redisSubscriber?.destroy(); + } + + private async initSubscriber() { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + await this.redisSubscriber.subscribeToWorkerResponseChannel(); + await this.redisSubscriber.subscribeToCommandChannel(); + await this.redisSubscriber.subscribeToEventLog(); + + this.redisSubscriber.addMessageHandler( + 'OrchestrationMessageReceiver', + async (channel: string, messageString: string) => { + if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await handleWorkerResponseMessage(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await handleCommandMessage(messageString); + } else if (channel === EVENT_BUS_REDIS_CHANNEL) { + await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString); + } + }, + ); + } +} diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 0cd2402352..c81874ad9d 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -1,10 +1,7 @@ import { Service } from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; -import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper'; -import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; -import { handleCommandMessage } from './orchestration/handleCommandMessage'; +import config from '@/config'; @Service() export class OrchestrationService { @@ -12,44 +9,29 @@ export class OrchestrationService { redisPublisher: RedisServicePubSubPublisher; - redisSubscriber: RedisServicePubSubSubscriber; + get isQueueMode() { + return config.getEnv('executions.mode') === 'queue'; + } constructor(readonly redisService: RedisService) {} async init() { await this.initPublisher(); - await this.initSubscriber(); this.initialized = true; } async shutdown() { await this.redisPublisher?.destroy(); - await this.redisSubscriber?.destroy(); } private async initPublisher() { this.redisPublisher = await this.redisService.getPubSubPublisher(); } - private async initSubscriber() { - this.redisSubscriber = await this.redisService.getPubSubSubscriber(); - - await this.redisSubscriber.subscribeToWorkerResponseChannel(); - await this.redisSubscriber.subscribeToCommandChannel(); - - this.redisSubscriber.addMessageHandler( - 'OrchestrationMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessage(messageString); - } else if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessage(messageString); - } - }, - ); - } - async getWorkerStatus(id?: string) { + if (!this.isQueueMode) { + return; + } if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } @@ -60,6 +42,9 @@ export class OrchestrationService { } async getWorkerIds() { + if (!this.isQueueMode) { + return; + } if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } @@ -67,4 +52,28 @@ export class OrchestrationService { command: 'getId', }); } + + async broadcastRestartEventbusAfterDestinationUpdate() { + if (!this.isQueueMode) { + return; + } + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'restartEventBus', + }); + } + + async broadcastReloadExternalSecretsProviders() { + if (!this.isQueueMode) { + return; + } + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'reloadExternalSecretsProviders', + }); + } } diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/handleCommandMessage.ts index 06e08977e8..6939555cdc 100644 --- a/packages/cli/src/services/orchestration/handleCommandMessage.ts +++ b/packages/cli/src/services/orchestration/handleCommandMessage.ts @@ -1,14 +1,23 @@ import { LoggerProxy } from 'n8n-workflow'; import { messageToRedisServiceCommandObject } from './helpers'; import config from '@/config'; -import { MessageEventBus } from '../../eventbus/MessageEventBus/MessageEventBus'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import Container from 'typedi'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; +import type { N8nInstanceType } from '@/Interfaces'; +import { License } from '@/License'; // this function handles commands sent to the MAIN instance. the workers handle their own commands export async function handleCommandMessage(messageString: string) { const queueModeId = config.get('redis.queueModeId'); + const instanceType = config.get('generic.instanceType') as N8nInstanceType; + const isMainInstance = instanceType === 'main'; const message = messageToRedisServiceCommandObject(messageString); + if (message) { + LoggerProxy.debug( + `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, + ); if ( message.senderId === queueModeId || (message.targets && !message.targets.includes(queueModeId)) @@ -21,16 +30,19 @@ export async function handleCommandMessage(messageString: string) { } switch (message.command) { case 'reloadLicense': - // at this point in time, only a single main instance is supported, thus this - // command _should_ never be caught currently (which is why we log a warning) - LoggerProxy.warn( - 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', - ); - // once multiple main instances are supported, this command should be handled - // await Container.get(License).reload(); + if (isMainInstance) { + // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently + LoggerProxy.error( + 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', + ); + return message; + } + await Container.get(License).reload(); break; case 'restartEventBus': await Container.get(MessageEventBus).restart(); + case 'reloadExternalSecretsProviders': + await Container.get(ExternalSecretsManager).reloadAllProviders(); default: break; } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 634450b2e7..602b5646f5 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -3,7 +3,8 @@ export type RedisServiceCommand = | 'getId' | 'restartEventBus' | 'stopWorker' - | 'reloadLicense'; + | 'reloadLicense' + | 'reloadExternalSecretsProviders'; /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -49,6 +50,13 @@ export type RedisServiceWorkerResponseObject = { error?: string; }; } + | { + command: 'reloadExternalSecretsProviders'; + payload: { + result: 'success' | 'error'; + error?: string; + }; + } | { command: 'stopWorker'; } diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts index 285a222586..63866fda78 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -6,6 +6,7 @@ import * as os from 'os'; import Container from 'typedi'; import { License } from '@/License'; import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; +import { ExternalSecretsManager } from '../ExternalSecrets/ExternalSecretsManager.ee'; export function getWorkerCommandReceivedHandler(options: { queueModeId: string; @@ -26,6 +27,9 @@ export function getWorkerCommandReceivedHandler(options: { return; } if (message) { + LoggerProxy.debug( + `RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`, + ); if (message.targets && !message.targets.includes(options.queueModeId)) { return; // early return if the message is not for this worker } @@ -59,14 +63,46 @@ export function getWorkerCommandReceivedHandler(options: { }); break; case 'restartEventBus': - await Container.get(MessageEventBus).restart(); - await options.redisPublisher.publishToWorkerChannel({ - workerId: options.queueModeId, - command: message.command, - payload: { - result: 'success', - }, - }); + try { + await Container.get(MessageEventBus).restart(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'success', + }, + }); + } catch (error) { + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'error', + error: (error as Error).message, + }, + }); + } + break; + case 'reloadExternalSecretsProviders': + try { + await Container.get(ExternalSecretsManager).reloadAllProviders(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'success', + }, + }); + } catch (error) { + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'error', + error: (error as Error).message, + }, + }); + } break; case 'reloadLicense': await Container.get(License).reload(); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 9367e4da7b..a39c4bd789 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -9,8 +9,10 @@ import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; +import { OrchestrationHandlerService } from '../../../src/services/orchestration.handler.service'; const os = Container.get(OrchestrationService); +const handler = Container.get(OrchestrationHandlerService); let queueModeId: string; @@ -76,8 +78,9 @@ describe('Orchestration Service', () => { test('should initialize', async () => { await os.init(); + await handler.init(); expect(os.redisPublisher).toBeDefined(); - expect(os.redisSubscriber).toBeDefined(); + expect(handler.redisSubscriber).toBeDefined(); expect(queueModeId).toBeDefined(); }); @@ -89,7 +92,7 @@ describe('Orchestration Service', () => { }); test('should handle command messages from others', async () => { - jest.spyOn(LoggerProxy, 'warn'); + jest.spyOn(LoggerProxy, 'error'); const responseFalseId = await handleCommandMessage( JSON.stringify({ senderId: 'test', @@ -99,8 +102,8 @@ describe('Orchestration Service', () => { expect(responseFalseId).toBeDefined(); expect(responseFalseId!.command).toEqual('reloadLicense'); expect(responseFalseId!.senderId).toEqual('test'); - expect(LoggerProxy.warn).toHaveBeenCalled(); - jest.spyOn(LoggerProxy, 'warn').mockRestore(); + expect(LoggerProxy.error).toHaveBeenCalled(); + jest.spyOn(LoggerProxy, 'error').mockRestore(); }); test('should reject command messages from iteslf', async () => {