diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index a3d5e87d4b..9ca3a66d33 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -11,7 +11,7 @@ jest.useFakeTimers(); describe('WaitTracker', () => { const executionRepository = mock(); const multiMainSetup = mock(); - const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup); + const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup); const execution = mock({ id: '123', diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 539934bd99..61212049d7 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -21,6 +21,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus' import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; import { License } from '@/license'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Server } from '@/server'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { OrchestrationService } from '@/services/orchestration.service'; @@ -240,7 +241,7 @@ export class Start extends BaseCommand { await Container.get(OrchestrationHandlerMainService).initWithOptions({ queueModeId: this.queueModeId, - redisPublisher: Container.get(OrchestrationService).redisPublisher, + publisher: Container.get(Publisher), }); if (!orchestrationService.isMultiMainSetupEnabled) return; diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index b1cba17fd4..4b719e8443 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; import { JobProcessor } from '@/scaling/job-processor'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import type { ScalingService } from '@/scaling/scaling.service'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; -import type { RedisServicePubSubSubscriber } from '@/services/redis/redis-service-pub-sub-subscriber'; import { BaseCommand } from './base-command'; @@ -40,8 +40,6 @@ export class Worker extends BaseCommand { jobProcessor: JobProcessor; - redisSubscriber: RedisServicePubSubSubscriber; - override needsCommunityPackages = true; /** @@ -131,7 +129,7 @@ export class Worker extends BaseCommand { await Container.get(OrchestrationWorkerService).init(); await Container.get(OrchestrationHandlerWorkerService).initWithOptions({ queueModeId: this.queueModeId, - redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher, + publisher: Container.get(Publisher), getRunningJobIds: () => this.jobProcessor.getRunningJobIds(), getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(), }); diff --git a/packages/cli/src/license.ts b/packages/cli/src/license.ts index fde17a8fcc..75a57efd2c 100644 --- a/packages/cli/src/license.ts +++ b/packages/cli/src/license.ts @@ -18,8 +18,6 @@ import { UNLIMITED_LICENSE_QUOTA, } from './constants'; import type { BooleanLicenseFeature, NumericLicenseFeature } from './interfaces'; -import type { RedisServicePubSubPublisher } from './services/redis/redis-service-pub-sub-publisher'; -import { RedisService } from './services/redis.service'; export type FeatureReturnType = Partial< { @@ -31,8 +29,6 @@ export type FeatureReturnType = Partial< export class License { private manager: LicenseManager | undefined; - private redisPublisher: RedisServicePubSubPublisher; - private isShuttingDown = false; constructor( @@ -163,13 +159,8 @@ export class License { } if (config.getEnv('executions.mode') === 'queue') { - if (!this.redisPublisher) { - this.logger.debug('Initializing Redis publisher for License Service'); - this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'reloadLicense', - }); + const { Publisher } = await import('@/scaling/pubsub/publisher.service'); + await Container.get(Publisher).publishCommand({ command: 'reloadLicense' }); } const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3'; diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts new file mode 100644 index 0000000000..06b7fe05b4 --- /dev/null +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -0,0 +1,75 @@ +import type { Redis as SingleNodeClient } from 'ioredis'; +import { mock } from 'jest-mock-extended'; + +import config from '@/config'; +import { generateNanoId } from '@/databases/utils/generators'; +import type { RedisClientService } from '@/services/redis/redis-client.service'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from '@/services/redis/redis-service-commands'; + +import { Publisher } from '../pubsub/publisher.service'; + +describe('Publisher', () => { + let queueModeId: string; + + beforeEach(() => { + config.set('executions.mode', 'queue'); + queueModeId = generateNanoId(); + config.set('redis.queueModeId', queueModeId); + }); + + const client = mock(); + const redisClientService = mock({ createClient: () => client }); + + describe('constructor', () => { + it('should init Redis client in scaling mode', () => { + const publisher = new Publisher(mock(), redisClientService); + + expect(publisher.getClient()).toEqual(client); + }); + + it('should not init Redis client in regular mode', () => { + config.set('executions.mode', 'regular'); + const publisher = new Publisher(mock(), redisClientService); + + expect(publisher.getClient()).toBeUndefined(); + }); + }); + + describe('shutdown', () => { + it('should disconnect Redis client', () => { + const publisher = new Publisher(mock(), redisClientService); + publisher.shutdown(); + expect(client.disconnect).toHaveBeenCalled(); + }); + }); + + describe('publishCommand', () => { + it('should publish command into `n8n.commands` pubsub channel', async () => { + const publisher = new Publisher(mock(), redisClientService); + const msg = mock({ command: 'reloadLicense' }); + + await publisher.publishCommand(msg); + + expect(client.publish).toHaveBeenCalledWith( + 'n8n.commands', + JSON.stringify({ ...msg, senderId: queueModeId }), + ); + }); + }); + + describe('publishWorkerResponse', () => { + it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { + const publisher = new Publisher(mock(), redisClientService); + const msg = mock({ + command: 'reloadExternalSecretsProviders', + }); + + await publisher.publishWorkerResponse(msg); + + expect(client.publish).toHaveBeenCalledWith('n8n.worker-response', JSON.stringify(msg)); + }); + }); +}); diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts new file mode 100644 index 0000000000..96566b7152 --- /dev/null +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -0,0 +1,60 @@ +import type { Redis as SingleNodeClient } from 'ioredis'; +import { mock } from 'jest-mock-extended'; + +import config from '@/config'; +import type { RedisClientService } from '@/services/redis/redis-client.service'; + +import { Subscriber } from '../pubsub/subscriber.service'; + +describe('Subscriber', () => { + beforeEach(() => { + config.set('executions.mode', 'queue'); + }); + + const client = mock(); + const redisClientService = mock({ createClient: () => client }); + + describe('constructor', () => { + it('should init Redis client in scaling mode', () => { + const subscriber = new Subscriber(mock(), redisClientService); + + expect(subscriber.getClient()).toEqual(client); + }); + + it('should not init Redis client in regular mode', () => { + config.set('executions.mode', 'regular'); + const subscriber = new Subscriber(mock(), redisClientService); + + expect(subscriber.getClient()).toBeUndefined(); + }); + }); + + describe('shutdown', () => { + it('should disconnect Redis client', () => { + const subscriber = new Subscriber(mock(), redisClientService); + subscriber.shutdown(); + expect(client.disconnect).toHaveBeenCalled(); + }); + }); + + describe('subscribe', () => { + it('should subscribe to pubsub channel', async () => { + const subscriber = new Subscriber(mock(), redisClientService); + + await subscriber.subscribe('n8n.commands'); + + expect(client.subscribe).toHaveBeenCalledWith('n8n.commands', expect.any(Function)); + }); + }); + + describe('setHandler', () => { + it('should set handler function', () => { + const subscriber = new Subscriber(mock(), redisClientService); + const handlerFn = jest.fn(); + + subscriber.addMessageHandler(handlerFn); + + expect(client.on).toHaveBeenCalledWith('message', handlerFn); + }); + }); +}); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts new file mode 100644 index 0000000000..fee4724d87 --- /dev/null +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -0,0 +1,88 @@ +import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; +import { Service } from 'typedi'; + +import config from '@/config'; +import { Logger } from '@/logger'; +import { RedisClientService } from '@/services/redis/redis-client.service'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from '@/services/redis/redis-service-commands'; + +/** + * Responsible for publishing messages into the pubsub channels used by scaling mode. + */ +@Service() +export class Publisher { + private readonly client: SingleNodeClient | MultiNodeClient; + + // #region Lifecycle + + constructor( + private readonly logger: Logger, + private readonly redisClientService: RedisClientService, + ) { + // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. + if (config.getEnv('executions.mode') !== 'queue') return; + + this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' }); + + this.client.on('error', (error) => this.logger.error(error.message)); + } + + getClient() { + return this.client; + } + + // @TODO: Use `@OnShutdown()` decorator + shutdown() { + this.client.disconnect(); + } + + // #endregion + + // #region Publishing + + /** Publish a command into the `n8n.commands` channel. */ + async publishCommand(msg: Omit) { + await this.client.publish( + 'n8n.commands', + JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }), + ); + + this.logger.debug(`Published ${msg.command} to command channel`); + } + + /** Publish a response for a command into the `n8n.worker-response` channel. */ + async publishWorkerResponse(msg: RedisServiceWorkerResponseObject) { + await this.client.publish('n8n.worker-response', JSON.stringify(msg)); + + this.logger.debug(`Published response for ${msg.command} to worker response channel`); + } + + // #endregion + + // #region Utils for multi-main setup + + // @TODO: The following methods are not pubsub-specific. Consider a dedicated client for multi-main setup. + + async setIfNotExists(key: string, value: string) { + const success = await this.client.setnx(key, value); + + return !!success; + } + + async setExpiration(key: string, ttl: number) { + await this.client.expire(key, ttl); + } + + async get(key: string) { + return await this.client.get(key); + } + + async clear(key: string) { + await this.client?.del(key); + } + + // #endregion +} diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts new file mode 100644 index 0000000000..b7f56904b6 --- /dev/null +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -0,0 +1,14 @@ +import type { + COMMAND_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from '@/services/redis/redis-constants'; + +/** + * Pubsub channel used by scaling mode: + * + * - `n8n.commands` for messages sent by a main process to command workers or other main processes + * - `n8n.worker-response` for messages sent by workers in response to commands from main processes + */ +export type ScalingPubSubChannel = + | typeof COMMAND_REDIS_CHANNEL + | typeof WORKER_RESPONSE_REDIS_CHANNEL; diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts new file mode 100644 index 0000000000..5335c4b04e --- /dev/null +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -0,0 +1,60 @@ +import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; +import { Service } from 'typedi'; + +import config from '@/config'; +import { Logger } from '@/logger'; +import { RedisClientService } from '@/services/redis/redis-client.service'; + +import type { ScalingPubSubChannel } from './pubsub.types'; + +/** + * Responsible for subscribing to the pubsub channels used by scaling mode. + */ +@Service() +export class Subscriber { + private readonly client: SingleNodeClient | MultiNodeClient; + + // #region Lifecycle + + constructor( + private readonly logger: Logger, + private readonly redisClientService: RedisClientService, + ) { + // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. + if (config.getEnv('executions.mode') !== 'queue') return; + + this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); + + this.client.on('error', (error) => this.logger.error(error.message)); + } + + getClient() { + return this.client; + } + + // @TODO: Use `@OnShutdown()` decorator + shutdown() { + this.client.disconnect(); + } + + // #endregion + + // #region Subscribing + + async subscribe(channel: ScalingPubSubChannel) { + await this.client.subscribe(channel, (error) => { + if (error) { + this.logger.error('Failed to subscribe to channel', { channel, cause: error }); + return; + } + + this.logger.debug('Subscribed to channel', { channel }); + }); + } + + addMessageHandler(handlerFn: (channel: string, msg: string) => void) { + this.client.on('message', handlerFn); + } + + // #endregion +} diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index 493453d308..b5c6c6a80c 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -16,11 +16,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o import { OrchestrationService } from '@/services/orchestration.service'; import { RedisClientService } from '@/services/redis/redis-client.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/redis-service-commands'; -import { RedisService } from '@/services/redis.service'; import { mockInstance } from '@test/mocking'; import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types'; +config.set('executions.mode', 'queue'); +config.set('generic.instanceType', 'main'); + const instanceSettings = Container.get(InstanceSettings); const redisClientService = mockInstance(RedisClientService); const mockRedisClient = mock(); @@ -32,10 +34,6 @@ mockInstance(ActiveWorkflowManager); let queueModeId: string; -function setDefaultConfig() { - config.set('executions.mode', 'queue'); -} - const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = { senderId: 'test', workerId: 'test', @@ -47,30 +45,10 @@ const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = { describe('Orchestration Service', () => { mockInstance(Push); - mockInstance(RedisService); mockInstance(ExternalSecretsManager); const eventBus = mockInstance(MessageEventBus); beforeAll(async () => { - jest.mock('@/services/redis/redis-service-pub-sub-publisher', () => { - return jest.fn().mockImplementation(() => { - return { - init: jest.fn(), - publishToEventLog: jest.fn(), - publishToWorkerChannel: jest.fn(), - destroy: jest.fn(), - }; - }); - }); - jest.mock('@/services/redis/redis-service-pub-sub-subscriber', () => { - return jest.fn().mockImplementation(() => { - return { - subscribeToCommandChannel: jest.fn(), - destroy: jest.fn(), - }; - }); - }); - setDefaultConfig(); queueModeId = config.get('redis.queueModeId'); // @ts-expect-error readonly property @@ -82,16 +60,16 @@ describe('Orchestration Service', () => { }); afterAll(async () => { - jest.mock('@/services/redis/redis-service-pub-sub-publisher').restoreAllMocks(); - jest.mock('@/services/redis/redis-service-pub-sub-subscriber').restoreAllMocks(); await os.shutdown(); }); test('should initialize', async () => { await os.init(); await handler.init(); - expect(os.redisPublisher).toBeDefined(); - expect(handler.redisSubscriber).toBeDefined(); + // @ts-expect-error Private field + expect(os.publisher).toBeDefined(); + // @ts-expect-error Private field + expect(handler.subscriber).toBeDefined(); expect(queueModeId).toBeDefined(); }); @@ -126,15 +104,16 @@ describe('Orchestration Service', () => { }); test('should send command messages', async () => { - setDefaultConfig(); - jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockImplementation(async () => {}); + // @ts-expect-error Private field + jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {}); await os.getWorkerIds(); - expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); - jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); + // @ts-expect-error Private field + expect(os.publisher.publishCommand).toHaveBeenCalled(); + // @ts-expect-error Private field + jest.spyOn(os.publisher, 'publishCommand').mockRestore(); }); test('should prevent receiving commands too often', async () => { - setDefaultConfig(); jest.spyOn(helpers, 'debounceMessageReceiver'); const res1 = await handleCommandMessageMain( JSON.stringify({ diff --git a/packages/cli/src/services/__tests__/redis.service.test.ts b/packages/cli/src/services/__tests__/redis.service.test.ts deleted file mode 100644 index 9990b58d6a..0000000000 --- a/packages/cli/src/services/__tests__/redis.service.test.ts +++ /dev/null @@ -1,57 +0,0 @@ -import Container from 'typedi'; - -import config from '@/config'; -import { Logger } from '@/logger'; -import { RedisService } from '@/services/redis.service'; -import { mockInstance } from '@test/mocking'; - -jest.mock('ioredis', () => { - const Redis = require('ioredis-mock'); - if (typeof Redis === 'object') { - // the first mock is an ioredis shim because ioredis-mock depends on it - // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 - return { - Command: { _transformer: { argument: {}, reply: {} } }, - }; - } - // second mock for our code - return function (...args: unknown[]) { - return new Redis(args); - }; -}); - -mockInstance(Logger); -const redisService = Container.get(RedisService); - -function setDefaultConfig() { - config.set('executions.mode', 'queue'); -} - -const PUBSUB_CHANNEL = 'testchannel'; - -describe('RedisService', () => { - beforeAll(async () => { - setDefaultConfig(); - }); - - test('should create pubsub publisher and subscriber with handler', async () => { - const pub = await redisService.getPubSubPublisher(); - const sub = await redisService.getPubSubSubscriber(); - expect(pub).toBeDefined(); - expect(sub).toBeDefined(); - - const mockHandler = jest.fn(); - mockHandler.mockImplementation((_channel: string, _message: string) => {}); - sub.addMessageHandler(PUBSUB_CHANNEL, mockHandler); - await sub.subscribe(PUBSUB_CHANNEL); - await pub.publish(PUBSUB_CHANNEL, 'test'); - await new Promise((resolve) => - setTimeout(async () => { - resolve(0); - }, 50), - ); - expect(mockHandler).toHaveBeenCalled(); - await sub.destroy(); - await pub.destroy(); - }); -}); diff --git a/packages/cli/src/services/orchestration.handler.base.service.ts b/packages/cli/src/services/orchestration.handler.base.service.ts index d2f0a2de5d..e994ff6308 100644 --- a/packages/cli/src/services/orchestration.handler.base.service.ts +++ b/packages/cli/src/services/orchestration.handler.base.service.ts @@ -1,21 +1,9 @@ -import Container from 'typedi'; - import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types'; import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types'; -import type { RedisServicePubSubSubscriber } from './redis/redis-service-pub-sub-subscriber'; -import { RedisService } from './redis.service'; export abstract class OrchestrationHandlerService { protected initialized = false; - redisSubscriber: RedisServicePubSubSubscriber; - - readonly redisService: RedisService; - - constructor() { - this.redisService = Container.get(RedisService); - } - async init() { await this.initSubscriber(); this.initialized = true; @@ -29,7 +17,6 @@ export abstract class OrchestrationHandlerService { } async shutdown() { - await this.redisSubscriber?.destroy(); this.initialized = false; } diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index b95191efb8..8e4963070d 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -1,24 +1,27 @@ import { InstanceSettings } from 'n8n-core'; import type { WorkflowActivateMode } from 'n8n-workflow'; -import { Service } from 'typedi'; +import Container, { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/logger'; +import type { Publisher } from '@/scaling/pubsub/publisher.service'; +import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/redis-service-commands'; -import type { RedisServicePubSubPublisher } from './redis/redis-service-pub-sub-publisher'; -import { RedisService } from './redis.service'; @Service() export class OrchestrationService { constructor( private readonly logger: Logger, - protected readonly instanceSettings: InstanceSettings, - private readonly redisService: RedisService, + readonly instanceSettings: InstanceSettings, readonly multiMainSetup: MultiMainSetup, ) {} + private publisher: Publisher; + + private subscriber: Subscriber; + protected isInitialized = false; private isMultiMainSetupLicensed = false; @@ -40,8 +43,6 @@ export class OrchestrationService { return !this.isMultiMainSetupEnabled; } - redisPublisher: RedisServicePubSubPublisher; - get instanceId() { return config.getEnv('redis.queueModeId'); } @@ -63,7 +64,13 @@ export class OrchestrationService { async init() { if (this.isInitialized) return; - if (config.get('executions.mode') === 'queue') await this.initPublisher(); + if (config.get('executions.mode') === 'queue') { + const { Publisher } = await import('@/scaling/pubsub/publisher.service'); + this.publisher = Container.get(Publisher); + + const { Subscriber } = await import('@/scaling/pubsub/subscriber.service'); + this.subscriber = Container.get(Subscriber); + } if (this.isMultiMainSetupEnabled) { await this.multiMainSetup.init(); @@ -74,12 +81,14 @@ export class OrchestrationService { this.isInitialized = true; } + // @TODO: Use `@OnShutdown()` decorator async shutdown() { if (!this.isInitialized) return; if (this.isMultiMainSetupEnabled) await this.multiMainSetup.shutdown(); - await this.redisPublisher.destroy(); + this.publisher.shutdown(); + this.subscriber.shutdown(); this.isInitialized = false; } @@ -88,10 +97,6 @@ export class OrchestrationService { // pubsub // ---------------------------------- - protected async initPublisher() { - this.redisPublisher = await this.redisService.getPubSubPublisher(); - } - async publish(command: RedisServiceCommand, data?: unknown) { if (!this.sanityCheck()) return; @@ -99,7 +104,7 @@ export class OrchestrationService { this.logger.debug(`[Instance ID ${this.instanceId}] Publishing command "${command}"`, payload); - await this.redisPublisher.publishToCommandChannel({ command, payload }); + await this.publisher.publishCommand({ command, payload }); } // ---------------------------------- @@ -113,7 +118,7 @@ export class OrchestrationService { this.logger.debug(`Sending "${command}" to command channel`); - await this.redisPublisher.publishToCommandChannel({ + await this.publisher.publishCommand({ command, targets: id ? [id] : undefined, }); @@ -126,7 +131,7 @@ export class OrchestrationService { this.logger.debug(`Sending "${command}" to command channel`); - await this.redisPublisher.publishToCommandChannel({ command }); + await this.publisher.publishCommand({ command }); } // ---------------------------------- diff --git a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts index aa0b02ffc2..a03389ce15 100644 --- a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts +++ b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts @@ -5,8 +5,8 @@ import { Service } from 'typedi'; import config from '@/config'; import { TIME } from '@/constants'; import { Logger } from '@/logger'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { RedisClientService } from '@/services/redis/redis-client.service'; -import { RedisServicePubSubPublisher } from '@/services/redis/redis-service-pub-sub-publisher'; import { TypedEmitter } from '@/typed-emitter'; type MultiMainEvents = { @@ -19,7 +19,7 @@ export class MultiMainSetup extends TypedEmitter { constructor( private readonly logger: Logger, private readonly instanceSettings: InstanceSettings, - private readonly redisPublisher: RedisServicePubSubPublisher, + private readonly publisher: Publisher, private readonly redisClientService: RedisClientService, ) { super(); @@ -52,16 +52,16 @@ export class MultiMainSetup extends TypedEmitter { const { isLeader } = this.instanceSettings; - if (isLeader) await this.redisPublisher.clear(this.leaderKey); + if (isLeader) await this.publisher.clear(this.leaderKey); } private async checkLeader() { - const leaderId = await this.redisPublisher.get(this.leaderKey); + const leaderId = await this.publisher.get(this.leaderKey); if (leaderId === this.instanceId) { this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`); - await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); + await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); return; } @@ -98,17 +98,14 @@ export class MultiMainSetup extends TypedEmitter { private async tryBecomeLeader() { // this can only succeed if leadership is currently vacant - const keySetSuccessfully = await this.redisPublisher.setIfNotExists( - this.leaderKey, - this.instanceId, - ); + const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.instanceId); if (keySetSuccessfully) { this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`); this.instanceSettings.markAsLeader(); - await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); + await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); /** * Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal, queue recovery @@ -120,6 +117,6 @@ export class MultiMainSetup extends TypedEmitter { } async fetchLeaderKey() { - return await this.redisPublisher.get(this.leaderKey); + return await this.publisher.get(this.leaderKey); } } diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index fc08146aa0..0aec5c0f08 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -1,5 +1,7 @@ import { Service } from 'typedi'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; + import { handleCommandMessageMain } from './handle-command-message-main'; import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main'; import type { MainResponseReceivedHandlerOptions } from './types'; @@ -8,21 +10,20 @@ import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redi @Service() export class OrchestrationHandlerMainService extends OrchestrationHandlerService { + constructor(private readonly subscriber: Subscriber) { + super(); + } + async initSubscriber(options: MainResponseReceivedHandlerOptions) { - this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + await this.subscriber.subscribe('n8n.commands'); + await this.subscriber.subscribe('n8n.worker-response'); - await this.redisSubscriber.subscribeToCommandChannel(); - await this.redisSubscriber.subscribeToWorkerResponseChannel(); - - this.redisSubscriber.addMessageHandler( - 'OrchestrationMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessageMain(messageString, options); - } else if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessageMain(messageString); - } - }, - ); + this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { + if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await handleWorkerResponseMessageMain(messageString, options); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await handleCommandMessageMain(messageString); + } + }); } } diff --git a/packages/cli/src/services/orchestration/main/types.ts b/packages/cli/src/services/orchestration/main/types.ts index 57992f8221..7388a55032 100644 --- a/packages/cli/src/services/orchestration/main/types.ts +++ b/packages/cli/src/services/orchestration/main/types.ts @@ -1,6 +1,6 @@ -import type { RedisServicePubSubPublisher } from '@/services/redis/redis-service-pub-sub-publisher'; +import type { Publisher } from '@/scaling/pubsub/publisher.service'; export type MainResponseReceivedHandlerOptions = { queueModeId: string; - redisPublisher: RedisServicePubSubPublisher; + publisher: Publisher; }; diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts index d1c778697d..5ecef9ade6 100644 --- a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts +++ b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts @@ -1,23 +1,24 @@ import { Service } from 'typedi'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; + import { handleCommandMessageWebhook } from './handle-command-message-webhook'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; import { COMMAND_REDIS_CHANNEL } from '../../redis/redis-constants'; @Service() export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService { + constructor(private readonly subscriber: Subscriber) { + super(); + } + async initSubscriber() { - this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + await this.subscriber.subscribe('n8n.commands'); - await this.redisSubscriber.subscribeToCommandChannel(); - - this.redisSubscriber.addMessageHandler( - 'OrchestrationMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessageWebhook(messageString); - } - }, - ); + this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { + if (channel === COMMAND_REDIS_CHANNEL) { + await handleCommandMessageWebhook(messageString); + } + }); } } diff --git a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts index 2bc46745c8..13cb8ceaff 100644 --- a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts +++ b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts @@ -39,7 +39,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa switch (message.command) { case 'getStatus': if (!debounceMessageReceiver(message, 500)) return; - await options.redisPublisher.publishToWorkerChannel({ + await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, command: 'getStatus', payload: { @@ -66,7 +66,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa break; case 'getId': if (!debounceMessageReceiver(message, 500)) return; - await options.redisPublisher.publishToWorkerChannel({ + await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, command: 'getId', }); @@ -75,7 +75,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa if (!debounceMessageReceiver(message, 500)) return; try { await Container.get(MessageEventBus).restart(); - await options.redisPublisher.publishToWorkerChannel({ + await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, command: 'restartEventBus', payload: { @@ -83,7 +83,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa }, }); } catch (error) { - await options.redisPublisher.publishToWorkerChannel({ + await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, command: 'restartEventBus', payload: { @@ -97,7 +97,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa if (!debounceMessageReceiver(message, 500)) return; try { await Container.get(ExternalSecretsManager).reloadAllProviders(); - await options.redisPublisher.publishToWorkerChannel({ + await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, command: 'reloadExternalSecretsProviders', payload: { @@ -105,7 +105,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa }, }); } catch (error) { - await options.redisPublisher.publishToWorkerChannel({ + await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, command: 'reloadExternalSecretsProviders', payload: { diff --git a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts index 3e91ca1655..a1356df5de 100644 --- a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts +++ b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts @@ -1,18 +1,19 @@ import { Service } from 'typedi'; +import { Subscriber } from '@/scaling/pubsub/subscriber.service'; + import { getWorkerCommandReceivedHandler } from './handle-command-message-worker'; import type { WorkerCommandReceivedHandlerOptions } from './types'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; @Service() export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService { - async initSubscriber(options: WorkerCommandReceivedHandlerOptions) { - this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + constructor(private readonly subscriber: Subscriber) { + super(); + } - await this.redisSubscriber.subscribeToCommandChannel(); - this.redisSubscriber.addMessageHandler( - 'WorkerCommandReceivedHandler', - getWorkerCommandReceivedHandler(options), - ); + async initSubscriber(options: WorkerCommandReceivedHandlerOptions) { + await this.subscriber.subscribe('n8n.commands'); + this.subscriber.addMessageHandler(getWorkerCommandReceivedHandler(options)); } } diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index 370f7a1ec3..df500ee3c1 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -1,11 +1,11 @@ import type { RunningJobSummary } from '@n8n/api-types'; import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; -import type { RedisServicePubSubPublisher } from '../../redis/redis-service-pub-sub-publisher'; +import type { Publisher } from '@/scaling/pubsub/publisher.service'; export interface WorkerCommandReceivedHandlerOptions { queueModeId: string; - redisPublisher: RedisServicePubSubPublisher; + publisher: Publisher; getRunningJobIds: () => Array; getRunningJobsSummary: () => RunningJobSummary[]; } diff --git a/packages/cli/src/services/redis.service.ts b/packages/cli/src/services/redis.service.ts deleted file mode 100644 index 22692f3637..0000000000 --- a/packages/cli/src/services/redis.service.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { Service } from 'typedi'; - -import { RedisServicePubSubPublisher } from './redis/redis-service-pub-sub-publisher'; -import { RedisServicePubSubSubscriber } from './redis/redis-service-pub-sub-subscriber'; - -/* - * This is a convenience service that provides access to all the Redis clients. - */ -@Service() -export class RedisService { - constructor( - private redisServicePubSubSubscriber: RedisServicePubSubSubscriber, - private redisServicePubSubPublisher: RedisServicePubSubPublisher, - ) {} - - async getPubSubSubscriber() { - await this.redisServicePubSubSubscriber.init(); - return this.redisServicePubSubSubscriber; - } - - async getPubSubPublisher() { - await this.redisServicePubSubPublisher.init(); - return this.redisServicePubSubPublisher; - } -} diff --git a/packages/cli/src/services/redis/redis-service-commands.ts b/packages/cli/src/services/redis/redis-service-commands.ts index 71ebd3dee5..e64d1e97fc 100644 --- a/packages/cli/src/services/redis/redis-service-commands.ts +++ b/packages/cli/src/services/redis/redis-service-commands.ts @@ -21,7 +21,7 @@ export type RedisServiceCommand = | 'clear-test-webhooks'; // multi-main only /** - * An object to be sent via Redis pub/sub from the main process to the workers. + * An object to be sent via Redis pubsub from the main process to the workers. * @field command: The command to be executed. * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. * @field payload: Optional arguments to be sent with the command. diff --git a/packages/cli/src/services/redis/redis-service-pub-sub-publisher.ts b/packages/cli/src/services/redis/redis-service-pub-sub-publisher.ts deleted file mode 100644 index 367591b4cd..0000000000 --- a/packages/cli/src/services/redis/redis-service-pub-sub-publisher.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { Service } from 'typedi'; - -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis-constants'; -import { RedisServiceBaseSender } from './redis-service-base-classes'; -import type { - RedisServiceCommandObject, - RedisServiceWorkerResponseObject, -} from './redis-service-commands'; - -@Service() -export class RedisServicePubSubPublisher extends RedisServiceBaseSender { - async init(): Promise { - await super.init('publisher(n8n)'); - } - - async publish(channel: string, message: string): Promise { - if (!this.redisClient) { - await this.init(); - } - await this.redisClient?.publish(channel, message); - } - - async publishToCommandChannel( - message: Omit, - ): Promise { - const messageWithSenderId = message as RedisServiceCommandObject; - messageWithSenderId.senderId = this.senderId; - await this.publish(COMMAND_REDIS_CHANNEL, JSON.stringify(messageWithSenderId)); - } - - async publishToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise { - await this.publish(WORKER_RESPONSE_REDIS_CHANNEL, JSON.stringify(message)); - } - - async setIfNotExists(key: string, value: string) { - if (!this.redisClient) await this.init(); - - const success = await this.redisClient?.setnx(key, value); - - return !!success; - } - - async setExpiration(key: string, ttl: number) { - if (!this.redisClient) await this.init(); - - await this.redisClient?.expire(key, ttl); - } - - async get(key: string) { - if (!this.redisClient) await this.init(); - - return await this.redisClient?.get(key); - } - - async clear(key: string) { - if (!this.redisClient) await this.init(); - - await this.redisClient?.del(key); - } -} diff --git a/packages/cli/src/services/redis/redis-service-pub-sub-subscriber.ts b/packages/cli/src/services/redis/redis-service-pub-sub-subscriber.ts deleted file mode 100644 index 52bc0b225f..0000000000 --- a/packages/cli/src/services/redis/redis-service-pub-sub-subscriber.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { Service } from 'typedi'; - -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis-constants'; -import { RedisServiceBaseReceiver } from './redis-service-base-classes'; - -@Service() -export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { - async init(): Promise { - await super.init('subscriber(n8n)'); - - this.redisClient?.on('message', (channel: string, message: string) => { - this.messageHandlers.forEach((handler: (channel: string, message: string) => void) => - handler(channel, message), - ); - }); - } - - async subscribe(channel: string): Promise { - if (!this.redisClient) { - await this.init(); - } - await this.redisClient?.subscribe(channel, (error, _count: number) => { - if (error) { - this.logger.error(`Error subscribing to channel ${channel}`); - } else { - this.logger.debug(`Subscribed Redis PubSub client to channel: ${channel}`); - } - }); - } - - async unsubscribe(channel: string): Promise { - if (!this.redisClient) { - return; - } - await this.redisClient?.unsubscribe(channel, (error, _count: number) => { - if (error) { - this.logger.error(`Error unsubscribing from channel ${channel}`); - } else { - this.logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`); - } - }); - } - - async subscribeToCommandChannel(): Promise { - await this.subscribe(COMMAND_REDIS_CHANNEL); - } - - async subscribeToWorkerResponseChannel(): Promise { - await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); - } - - async unSubscribeFromCommandChannel(): Promise { - await this.unsubscribe(COMMAND_REDIS_CHANNEL); - } - - async unSubscribeFromWorkerResponseChannel(): Promise { - await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL); - } -}