refactor(core): Flatten Redis pubsub class hierarchy (no-changelog) (#10616)

This commit is contained in:
Iván Ovejero 2024-09-17 15:45:42 +02:00 committed by GitHub
parent c55df63abc
commit aa00d9c2ae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 392 additions and 335 deletions

View file

@ -11,7 +11,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup);
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
const execution = mock<IExecutionResponse>({
id: '123',

View file

@ -21,6 +21,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { Server } from '@/server';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { OrchestrationService } from '@/services/orchestration.service';
@ -240,7 +241,7 @@ export class Start extends BaseCommand {
await Container.get(OrchestrationHandlerMainService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationService).redisPublisher,
publisher: Container.get(Publisher),
});
if (!orchestrationService.isMultiMainSetupEnabled) return;

View file

@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import type { ScalingService } from '@/scaling/scaling.service';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { RedisServicePubSubSubscriber } from '@/services/redis/redis-service-pub-sub-subscriber';
import { BaseCommand } from './base-command';
@ -40,8 +40,6 @@ export class Worker extends BaseCommand {
jobProcessor: JobProcessor;
redisSubscriber: RedisServicePubSubSubscriber;
override needsCommunityPackages = true;
/**
@ -131,7 +129,7 @@ export class Worker extends BaseCommand {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationHandlerWorkerService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
publisher: Container.get(Publisher),
getRunningJobIds: () => this.jobProcessor.getRunningJobIds(),
getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(),
});

View file

@ -18,8 +18,6 @@ import {
UNLIMITED_LICENSE_QUOTA,
} from './constants';
import type { BooleanLicenseFeature, NumericLicenseFeature } from './interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/redis-service-pub-sub-publisher';
import { RedisService } from './services/redis.service';
export type FeatureReturnType = Partial<
{
@ -31,8 +29,6 @@ export type FeatureReturnType = Partial<
export class License {
private manager: LicenseManager | undefined;
private redisPublisher: RedisServicePubSubPublisher;
private isShuttingDown = false;
constructor(
@ -163,13 +159,8 @@ export class License {
}
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
const { Publisher } = await import('@/scaling/pubsub/publisher.service');
await Container.get(Publisher).publishCommand({ command: 'reloadLicense' });
}
const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3';

View file

@ -0,0 +1,75 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';
import { Publisher } from '../pubsub/publisher.service';
describe('Publisher', () => {
let queueModeId: string;
beforeEach(() => {
config.set('executions.mode', 'queue');
queueModeId = generateNanoId();
config.set('redis.queueModeId', queueModeId);
});
const client = mock<SingleNodeClient>();
const redisClientService = mock<RedisClientService>({ createClient: () => client });
describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(mock(), redisClientService);
expect(publisher.getClient()).toEqual(client);
});
it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const publisher = new Publisher(mock(), redisClientService);
expect(publisher.getClient()).toBeUndefined();
});
});
describe('shutdown', () => {
it('should disconnect Redis client', () => {
const publisher = new Publisher(mock(), redisClientService);
publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});
describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceCommandObject>({ command: 'reloadLicense' });
await publisher.publishCommand(msg);
expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId }),
);
});
});
describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceWorkerResponseObject>({
command: 'reloadExternalSecretsProviders',
});
await publisher.publishWorkerResponse(msg);
expect(client.publish).toHaveBeenCalledWith('n8n.worker-response', JSON.stringify(msg));
});
});
});

View file

@ -0,0 +1,60 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import config from '@/config';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import { Subscriber } from '../pubsub/subscriber.service';
describe('Subscriber', () => {
beforeEach(() => {
config.set('executions.mode', 'queue');
});
const client = mock<SingleNodeClient>();
const redisClientService = mock<RedisClientService>({ createClient: () => client });
describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService);
expect(subscriber.getClient()).toEqual(client);
});
it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService);
expect(subscriber.getClient()).toBeUndefined();
});
});
describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService);
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});
describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService);
await subscriber.subscribe('n8n.commands');
expect(client.subscribe).toHaveBeenCalledWith('n8n.commands', expect.any(Function));
});
});
describe('setHandler', () => {
it('should set handler function', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const handlerFn = jest.fn();
subscriber.addMessageHandler(handlerFn);
expect(client.on).toHaveBeenCalledWith('message', handlerFn);
});
});
});

View file

@ -0,0 +1,88 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';
/**
* Responsible for publishing messages into the pubsub channels used by scaling mode.
*/
@Service()
export class Publisher {
private readonly client: SingleNodeClient | MultiNodeClient;
// #region Lifecycle
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' });
this.client.on('error', (error) => this.logger.error(error.message));
}
getClient() {
return this.client;
}
// @TODO: Use `@OnShutdown()` decorator
shutdown() {
this.client.disconnect();
}
// #endregion
// #region Publishing
/** Publish a command into the `n8n.commands` channel. */
async publishCommand(msg: Omit<RedisServiceCommandObject, 'senderId'>) {
await this.client.publish(
'n8n.commands',
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
);
this.logger.debug(`Published ${msg.command} to command channel`);
}
/** Publish a response for a command into the `n8n.worker-response` channel. */
async publishWorkerResponse(msg: RedisServiceWorkerResponseObject) {
await this.client.publish('n8n.worker-response', JSON.stringify(msg));
this.logger.debug(`Published response for ${msg.command} to worker response channel`);
}
// #endregion
// #region Utils for multi-main setup
// @TODO: The following methods are not pubsub-specific. Consider a dedicated client for multi-main setup.
async setIfNotExists(key: string, value: string) {
const success = await this.client.setnx(key, value);
return !!success;
}
async setExpiration(key: string, ttl: number) {
await this.client.expire(key, ttl);
}
async get(key: string) {
return await this.client.get(key);
}
async clear(key: string) {
await this.client?.del(key);
}
// #endregion
}

View file

@ -0,0 +1,14 @@
import type {
COMMAND_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from '@/services/redis/redis-constants';
/**
* Pubsub channel used by scaling mode:
*
* - `n8n.commands` for messages sent by a main process to command workers or other main processes
* - `n8n.worker-response` for messages sent by workers in response to commands from main processes
*/
export type ScalingPubSubChannel =
| typeof COMMAND_REDIS_CHANNEL
| typeof WORKER_RESPONSE_REDIS_CHANNEL;

View file

@ -0,0 +1,60 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type { ScalingPubSubChannel } from './pubsub.types';
/**
* Responsible for subscribing to the pubsub channels used by scaling mode.
*/
@Service()
export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient;
// #region Lifecycle
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });
this.client.on('error', (error) => this.logger.error(error.message));
}
getClient() {
return this.client;
}
// @TODO: Use `@OnShutdown()` decorator
shutdown() {
this.client.disconnect();
}
// #endregion
// #region Subscribing
async subscribe(channel: ScalingPubSubChannel) {
await this.client.subscribe(channel, (error) => {
if (error) {
this.logger.error('Failed to subscribe to channel', { channel, cause: error });
return;
}
this.logger.debug('Subscribed to channel', { channel });
});
}
addMessageHandler(handlerFn: (channel: string, msg: string) => void) {
this.client.on('message', handlerFn);
}
// #endregion
}

View file

@ -16,11 +16,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o
import { OrchestrationService } from '@/services/orchestration.service';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/redis-service-commands';
import { RedisService } from '@/services/redis.service';
import { mockInstance } from '@test/mocking';
import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types';
config.set('executions.mode', 'queue');
config.set('generic.instanceType', 'main');
const instanceSettings = Container.get(InstanceSettings);
const redisClientService = mockInstance(RedisClientService);
const mockRedisClient = mock<Redis>();
@ -32,10 +34,6 @@ mockInstance(ActiveWorkflowManager);
let queueModeId: string;
function setDefaultConfig() {
config.set('executions.mode', 'queue');
}
const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = {
senderId: 'test',
workerId: 'test',
@ -47,30 +45,10 @@ const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = {
describe('Orchestration Service', () => {
mockInstance(Push);
mockInstance(RedisService);
mockInstance(ExternalSecretsManager);
const eventBus = mockInstance(MessageEventBus);
beforeAll(async () => {
jest.mock('@/services/redis/redis-service-pub-sub-publisher', () => {
return jest.fn().mockImplementation(() => {
return {
init: jest.fn(),
publishToEventLog: jest.fn(),
publishToWorkerChannel: jest.fn(),
destroy: jest.fn(),
};
});
});
jest.mock('@/services/redis/redis-service-pub-sub-subscriber', () => {
return jest.fn().mockImplementation(() => {
return {
subscribeToCommandChannel: jest.fn(),
destroy: jest.fn(),
};
});
});
setDefaultConfig();
queueModeId = config.get('redis.queueModeId');
// @ts-expect-error readonly property
@ -82,16 +60,16 @@ describe('Orchestration Service', () => {
});
afterAll(async () => {
jest.mock('@/services/redis/redis-service-pub-sub-publisher').restoreAllMocks();
jest.mock('@/services/redis/redis-service-pub-sub-subscriber').restoreAllMocks();
await os.shutdown();
});
test('should initialize', async () => {
await os.init();
await handler.init();
expect(os.redisPublisher).toBeDefined();
expect(handler.redisSubscriber).toBeDefined();
// @ts-expect-error Private field
expect(os.publisher).toBeDefined();
// @ts-expect-error Private field
expect(handler.subscriber).toBeDefined();
expect(queueModeId).toBeDefined();
});
@ -126,15 +104,16 @@ describe('Orchestration Service', () => {
});
test('should send command messages', async () => {
setDefaultConfig();
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockImplementation(async () => {});
// @ts-expect-error Private field
jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {});
await os.getWorkerIds();
expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled();
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore();
// @ts-expect-error Private field
expect(os.publisher.publishCommand).toHaveBeenCalled();
// @ts-expect-error Private field
jest.spyOn(os.publisher, 'publishCommand').mockRestore();
});
test('should prevent receiving commands too often', async () => {
setDefaultConfig();
jest.spyOn(helpers, 'debounceMessageReceiver');
const res1 = await handleCommandMessageMain(
JSON.stringify({

View file

@ -1,57 +0,0 @@
import Container from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisService } from '@/services/redis.service';
import { mockInstance } from '@test/mocking';
jest.mock('ioredis', () => {
const Redis = require('ioredis-mock');
if (typeof Redis === 'object') {
// the first mock is an ioredis shim because ioredis-mock depends on it
// https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111
return {
Command: { _transformer: { argument: {}, reply: {} } },
};
}
// second mock for our code
return function (...args: unknown[]) {
return new Redis(args);
};
});
mockInstance(Logger);
const redisService = Container.get(RedisService);
function setDefaultConfig() {
config.set('executions.mode', 'queue');
}
const PUBSUB_CHANNEL = 'testchannel';
describe('RedisService', () => {
beforeAll(async () => {
setDefaultConfig();
});
test('should create pubsub publisher and subscriber with handler', async () => {
const pub = await redisService.getPubSubPublisher();
const sub = await redisService.getPubSubSubscriber();
expect(pub).toBeDefined();
expect(sub).toBeDefined();
const mockHandler = jest.fn();
mockHandler.mockImplementation((_channel: string, _message: string) => {});
sub.addMessageHandler(PUBSUB_CHANNEL, mockHandler);
await sub.subscribe(PUBSUB_CHANNEL);
await pub.publish(PUBSUB_CHANNEL, 'test');
await new Promise((resolve) =>
setTimeout(async () => {
resolve(0);
}, 50),
);
expect(mockHandler).toHaveBeenCalled();
await sub.destroy();
await pub.destroy();
});
});

View file

@ -1,21 +1,9 @@
import Container from 'typedi';
import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
import type { RedisServicePubSubSubscriber } from './redis/redis-service-pub-sub-subscriber';
import { RedisService } from './redis.service';
export abstract class OrchestrationHandlerService {
protected initialized = false;
redisSubscriber: RedisServicePubSubSubscriber;
readonly redisService: RedisService;
constructor() {
this.redisService = Container.get(RedisService);
}
async init() {
await this.initSubscriber();
this.initialized = true;
@ -29,7 +17,6 @@ export abstract class OrchestrationHandlerService {
}
async shutdown() {
await this.redisSubscriber?.destroy();
this.initialized = false;
}

View file

@ -1,24 +1,27 @@
import { InstanceSettings } from 'n8n-core';
import type { WorkflowActivateMode } from 'n8n-workflow';
import { Service } from 'typedi';
import Container, { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee';
import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/redis-service-commands';
import type { RedisServicePubSubPublisher } from './redis/redis-service-pub-sub-publisher';
import { RedisService } from './redis.service';
@Service()
export class OrchestrationService {
constructor(
private readonly logger: Logger,
protected readonly instanceSettings: InstanceSettings,
private readonly redisService: RedisService,
readonly instanceSettings: InstanceSettings,
readonly multiMainSetup: MultiMainSetup,
) {}
private publisher: Publisher;
private subscriber: Subscriber;
protected isInitialized = false;
private isMultiMainSetupLicensed = false;
@ -40,8 +43,6 @@ export class OrchestrationService {
return !this.isMultiMainSetupEnabled;
}
redisPublisher: RedisServicePubSubPublisher;
get instanceId() {
return config.getEnv('redis.queueModeId');
}
@ -63,7 +64,13 @@ export class OrchestrationService {
async init() {
if (this.isInitialized) return;
if (config.get('executions.mode') === 'queue') await this.initPublisher();
if (config.get('executions.mode') === 'queue') {
const { Publisher } = await import('@/scaling/pubsub/publisher.service');
this.publisher = Container.get(Publisher);
const { Subscriber } = await import('@/scaling/pubsub/subscriber.service');
this.subscriber = Container.get(Subscriber);
}
if (this.isMultiMainSetupEnabled) {
await this.multiMainSetup.init();
@ -74,12 +81,14 @@ export class OrchestrationService {
this.isInitialized = true;
}
// @TODO: Use `@OnShutdown()` decorator
async shutdown() {
if (!this.isInitialized) return;
if (this.isMultiMainSetupEnabled) await this.multiMainSetup.shutdown();
await this.redisPublisher.destroy();
this.publisher.shutdown();
this.subscriber.shutdown();
this.isInitialized = false;
}
@ -88,10 +97,6 @@ export class OrchestrationService {
// pubsub
// ----------------------------------
protected async initPublisher() {
this.redisPublisher = await this.redisService.getPubSubPublisher();
}
async publish(command: RedisServiceCommand, data?: unknown) {
if (!this.sanityCheck()) return;
@ -99,7 +104,7 @@ export class OrchestrationService {
this.logger.debug(`[Instance ID ${this.instanceId}] Publishing command "${command}"`, payload);
await this.redisPublisher.publishToCommandChannel({ command, payload });
await this.publisher.publishCommand({ command, payload });
}
// ----------------------------------
@ -113,7 +118,7 @@ export class OrchestrationService {
this.logger.debug(`Sending "${command}" to command channel`);
await this.redisPublisher.publishToCommandChannel({
await this.publisher.publishCommand({
command,
targets: id ? [id] : undefined,
});
@ -126,7 +131,7 @@ export class OrchestrationService {
this.logger.debug(`Sending "${command}" to command channel`);
await this.redisPublisher.publishToCommandChannel({ command });
await this.publisher.publishCommand({ command });
}
// ----------------------------------

View file

@ -5,8 +5,8 @@ import { Service } from 'typedi';
import config from '@/config';
import { TIME } from '@/constants';
import { Logger } from '@/logger';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { RedisClientService } from '@/services/redis/redis-client.service';
import { RedisServicePubSubPublisher } from '@/services/redis/redis-service-pub-sub-publisher';
import { TypedEmitter } from '@/typed-emitter';
type MultiMainEvents = {
@ -19,7 +19,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly redisPublisher: RedisServicePubSubPublisher,
private readonly publisher: Publisher,
private readonly redisClientService: RedisClientService,
) {
super();
@ -52,16 +52,16 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
const { isLeader } = this.instanceSettings;
if (isLeader) await this.redisPublisher.clear(this.leaderKey);
if (isLeader) await this.publisher.clear(this.leaderKey);
}
private async checkLeader() {
const leaderId = await this.redisPublisher.get(this.leaderKey);
const leaderId = await this.publisher.get(this.leaderKey);
if (leaderId === this.instanceId) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`);
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
return;
}
@ -98,17 +98,14 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
private async tryBecomeLeader() {
// this can only succeed if leadership is currently vacant
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(
this.leaderKey,
this.instanceId,
);
const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.instanceId);
if (keySetSuccessfully) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`);
this.instanceSettings.markAsLeader();
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
/**
* Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal, queue recovery
@ -120,6 +117,6 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
}
async fetchLeaderKey() {
return await this.redisPublisher.get(this.leaderKey);
return await this.publisher.get(this.leaderKey);
}
}

View file

@ -1,5 +1,7 @@
import { Service } from 'typedi';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageMain } from './handle-command-message-main';
import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main';
import type { MainResponseReceivedHandlerOptions } from './types';
@ -8,21 +10,20 @@ import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redi
@Service()
export class OrchestrationHandlerMainService extends OrchestrationHandlerService {
constructor(private readonly subscriber: Subscriber) {
super();
}
async initSubscriber(options: MainResponseReceivedHandlerOptions) {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.subscriber.subscribe('n8n.commands');
await this.subscriber.subscribe('n8n.worker-response');
await this.redisSubscriber.subscribeToCommandChannel();
await this.redisSubscriber.subscribeToWorkerResponseChannel();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageMain(messageString);
}
},
);
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageMain(messageString);
}
});
}
}

View file

@ -1,6 +1,6 @@
import type { RedisServicePubSubPublisher } from '@/services/redis/redis-service-pub-sub-publisher';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
export type MainResponseReceivedHandlerOptions = {
queueModeId: string;
redisPublisher: RedisServicePubSubPublisher;
publisher: Publisher;
};

View file

@ -1,23 +1,24 @@
import { Service } from 'typedi';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageWebhook } from './handle-command-message-webhook';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import { COMMAND_REDIS_CHANNEL } from '../../redis/redis-constants';
@Service()
export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService {
constructor(private readonly subscriber: Subscriber) {
super();
}
async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.subscriber.subscribe('n8n.commands');
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageWebhook(messageString);
}
},
);
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageWebhook(messageString);
}
});
}
}

View file

@ -39,7 +39,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
switch (message.command) {
case 'getStatus':
if (!debounceMessageReceiver(message, 500)) return;
await options.redisPublisher.publishToWorkerChannel({
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'getStatus',
payload: {
@ -66,7 +66,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
break;
case 'getId':
if (!debounceMessageReceiver(message, 500)) return;
await options.redisPublisher.publishToWorkerChannel({
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'getId',
});
@ -75,7 +75,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(MessageEventBus).restart();
await options.redisPublisher.publishToWorkerChannel({
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
@ -83,7 +83,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
},
});
} catch (error) {
await options.redisPublisher.publishToWorkerChannel({
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
@ -97,7 +97,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.redisPublisher.publishToWorkerChannel({
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
@ -105,7 +105,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
},
});
} catch (error) {
await options.redisPublisher.publishToWorkerChannel({
await options.publisher.publishWorkerResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {

View file

@ -1,18 +1,19 @@
import { Service } from 'typedi';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { getWorkerCommandReceivedHandler } from './handle-command-message-worker';
import type { WorkerCommandReceivedHandlerOptions } from './types';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
@Service()
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
async initSubscriber(options: WorkerCommandReceivedHandlerOptions) {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
constructor(private readonly subscriber: Subscriber) {
super();
}
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'WorkerCommandReceivedHandler',
getWorkerCommandReceivedHandler(options),
);
async initSubscriber(options: WorkerCommandReceivedHandlerOptions) {
await this.subscriber.subscribe('n8n.commands');
this.subscriber.addMessageHandler(getWorkerCommandReceivedHandler(options));
}
}

View file

@ -1,11 +1,11 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { RedisServicePubSubPublisher } from '../../redis/redis-service-pub-sub-publisher';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
redisPublisher: RedisServicePubSubPublisher;
publisher: Publisher;
getRunningJobIds: () => Array<string | number>;
getRunningJobsSummary: () => RunningJobSummary[];
}

View file

@ -1,25 +0,0 @@
import { Service } from 'typedi';
import { RedisServicePubSubPublisher } from './redis/redis-service-pub-sub-publisher';
import { RedisServicePubSubSubscriber } from './redis/redis-service-pub-sub-subscriber';
/*
* This is a convenience service that provides access to all the Redis clients.
*/
@Service()
export class RedisService {
constructor(
private redisServicePubSubSubscriber: RedisServicePubSubSubscriber,
private redisServicePubSubPublisher: RedisServicePubSubPublisher,
) {}
async getPubSubSubscriber() {
await this.redisServicePubSubSubscriber.init();
return this.redisServicePubSubSubscriber;
}
async getPubSubPublisher() {
await this.redisServicePubSubPublisher.init();
return this.redisServicePubSubPublisher;
}
}

View file

@ -21,7 +21,7 @@ export type RedisServiceCommand =
| 'clear-test-webhooks'; // multi-main only
/**
* An object to be sent via Redis pub/sub from the main process to the workers.
* An object to be sent via Redis pubsub from the main process to the workers.
* @field command: The command to be executed.
* @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids.
* @field payload: Optional arguments to be sent with the command.

View file

@ -1,60 +0,0 @@
import { Service } from 'typedi';
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis-constants';
import { RedisServiceBaseSender } from './redis-service-base-classes';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from './redis-service-commands';
@Service()
export class RedisServicePubSubPublisher extends RedisServiceBaseSender {
async init(): Promise<void> {
await super.init('publisher(n8n)');
}
async publish(channel: string, message: string): Promise<void> {
if (!this.redisClient) {
await this.init();
}
await this.redisClient?.publish(channel, message);
}
async publishToCommandChannel(
message: Omit<RedisServiceCommandObject, 'senderId'>,
): Promise<void> {
const messageWithSenderId = message as RedisServiceCommandObject;
messageWithSenderId.senderId = this.senderId;
await this.publish(COMMAND_REDIS_CHANNEL, JSON.stringify(messageWithSenderId));
}
async publishToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise<void> {
await this.publish(WORKER_RESPONSE_REDIS_CHANNEL, JSON.stringify(message));
}
async setIfNotExists(key: string, value: string) {
if (!this.redisClient) await this.init();
const success = await this.redisClient?.setnx(key, value);
return !!success;
}
async setExpiration(key: string, ttl: number) {
if (!this.redisClient) await this.init();
await this.redisClient?.expire(key, ttl);
}
async get(key: string) {
if (!this.redisClient) await this.init();
return await this.redisClient?.get(key);
}
async clear(key: string) {
if (!this.redisClient) await this.init();
await this.redisClient?.del(key);
}
}

View file

@ -1,59 +0,0 @@
import { Service } from 'typedi';
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis-constants';
import { RedisServiceBaseReceiver } from './redis-service-base-classes';
@Service()
export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
async init(): Promise<void> {
await super.init('subscriber(n8n)');
this.redisClient?.on('message', (channel: string, message: string) => {
this.messageHandlers.forEach((handler: (channel: string, message: string) => void) =>
handler(channel, message),
);
});
}
async subscribe(channel: string): Promise<void> {
if (!this.redisClient) {
await this.init();
}
await this.redisClient?.subscribe(channel, (error, _count: number) => {
if (error) {
this.logger.error(`Error subscribing to channel ${channel}`);
} else {
this.logger.debug(`Subscribed Redis PubSub client to channel: ${channel}`);
}
});
}
async unsubscribe(channel: string): Promise<void> {
if (!this.redisClient) {
return;
}
await this.redisClient?.unsubscribe(channel, (error, _count: number) => {
if (error) {
this.logger.error(`Error unsubscribing from channel ${channel}`);
} else {
this.logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`);
}
});
}
async subscribeToCommandChannel(): Promise<void> {
await this.subscribe(COMMAND_REDIS_CHANNEL);
}
async subscribeToWorkerResponseChannel(): Promise<void> {
await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL);
}
async unSubscribeFromCommandChannel(): Promise<void> {
await this.unsubscribe(COMMAND_REDIS_CHANNEL);
}
async unSubscribeFromWorkerResponseChannel(): Promise<void> {
await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL);
}
}