feat(core): Add command to trigger license refresh on workers (#7184)

This PR implements the updated license SDK so that worker and webhook
instances do not auto-renew licenses any more.

Instead, they receive a `reloadLicense` command via the Redis client
that will fetch the updated license after it was saved on the main
instance

This also contains some refactoring with moving redis sub and pub
clients into the event bus directly, to prevent cyclic dependency
issues.
This commit is contained in:
Michael Auerswald 2023-09-17 11:05:54 +02:00 committed by GitHub
parent d317e09c59
commit 9f797b96d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 293 additions and 139 deletions

View file

@ -100,7 +100,7 @@
}, },
"dependencies": { "dependencies": {
"@n8n/client-oauth2": "workspace:*", "@n8n/client-oauth2": "workspace:*",
"@n8n_io/license-sdk": "~2.5.1", "@n8n_io/license-sdk": "~2.6.0",
"@oclif/command": "^1.8.16", "@oclif/command": "^1.8.16",
"@oclif/core": "^1.16.4", "@oclif/core": "^1.16.4",
"@oclif/errors": "^1.3.6", "@oclif/errors": "^1.3.6",

View file

@ -11,8 +11,10 @@ import {
SETTINGS_LICENSE_CERT_KEY, SETTINGS_LICENSE_CERT_KEY,
UNLIMITED_LICENSE_QUOTA, UNLIMITED_LICENSE_QUOTA,
} from './constants'; } from './constants';
import { Service } from 'typedi'; import Container, { Service } from 'typedi';
import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces'; import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
type FeatureReturnType = Partial< type FeatureReturnType = Partial<
{ {
@ -26,18 +28,28 @@ export class License {
private manager: LicenseManager | undefined; private manager: LicenseManager | undefined;
instanceId: string | undefined;
private redisPublisher: RedisServicePubSubPublisher;
constructor() { constructor() {
this.logger = getLogger(); this.logger = getLogger();
} }
async init(instanceId: string) { async init(instanceId: string, instanceType: N8nInstanceType = 'main') {
if (this.manager) { if (this.manager) {
return; return;
} }
this.instanceId = instanceId;
const isMainInstance = instanceType === 'main';
const server = config.getEnv('license.serverUrl'); const server = config.getEnv('license.serverUrl');
const autoRenewEnabled = config.getEnv('license.autoRenewEnabled'); const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
const offlineMode = !isMainInstance;
const autoRenewOffset = config.getEnv('license.autoRenewOffset'); const autoRenewOffset = config.getEnv('license.autoRenewOffset');
const saveCertStr = isMainInstance
? async (value: TLicenseBlock) => this.saveCertStr(value)
: async () => {};
try { try {
this.manager = new LicenseManager({ this.manager = new LicenseManager({
@ -46,9 +58,10 @@ export class License {
productIdentifier: `n8n-${N8N_VERSION}`, productIdentifier: `n8n-${N8N_VERSION}`,
autoRenewEnabled, autoRenewEnabled,
autoRenewOffset, autoRenewOffset,
offlineMode,
logger: this.logger, logger: this.logger,
loadCertStr: async () => this.loadCertStr(), loadCertStr: async () => this.loadCertStr(),
saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value), saveCertStr,
deviceFingerprint: () => instanceId, deviceFingerprint: () => instanceId,
}); });
@ -86,6 +99,15 @@ export class License {
}, },
['key'], ['key'],
); );
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
}
} }
async activate(activationKey: string): Promise<void> { async activate(activationKey: string): Promise<void> {
@ -96,6 +118,14 @@ export class License {
await this.manager.activate(activationKey); await this.manager.activate(activationKey);
} }
async reload(): Promise<void> {
if (!this.manager) {
return;
}
this.logger.debug('Reloading license');
await this.manager.reload();
}
async renew() { async renew() {
if (!this.manager) { if (!this.manager) {
return; return;

View file

@ -1470,7 +1470,9 @@ export class Server extends AbstractServer {
// ---------------------------------------- // ----------------------------------------
if (!eventBus.isInitialized) { if (!eventBus.isInitialized) {
await eventBus.initialize(); await eventBus.initialize({
uniqueInstanceId: this.uniqueInstanceId,
});
} }
if (this.endpointPresetCredentials !== '') { if (this.endpointPresetCredentials !== '') {

View file

@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting';
import { ExternalHooks } from '@/ExternalHooks'; import { ExternalHooks } from '@/ExternalHooks';
import { NodeTypes } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { IExternalHooksClass } from '@/Interfaces'; import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog'; import { PostHogClient } from '@/posthog';
import { License } from '@/License'; import { License } from '@/License';
@ -113,9 +113,9 @@ export abstract class BaseCommand extends Command {
await this.externalHooks.init(); await this.externalHooks.init();
} }
async initLicense(): Promise<void> { async initLicense(instanceType: N8nInstanceType = 'main'): Promise<void> {
const license = Container.get(License); const license = Container.get(License);
await license.init(this.instanceId); await license.init(this.instanceId, instanceType);
const activationKey = config.getEnv('license.activationKey'); const activationKey = config.getEnv('license.activationKey');

View file

@ -197,7 +197,7 @@ export class Start extends BaseCommand {
this.logger.info('Initializing n8n process'); this.logger.info('Initializing n8n process');
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
await this.initLicense(); await this.initLicense('main');
await this.initBinaryManager(); await this.initBinaryManager();
await this.initExternalHooks(); await this.initExternalHooks();
await this.initExternalSecrets(); await this.initExternalSecrets();

View file

@ -77,7 +77,7 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal(); await this.initCrashJournal();
await super.init(); await super.init();
await this.initLicense(); await this.initLicense('webhook');
await this.initBinaryManager(); await this.initBinaryManager();
await this.initExternalHooks(); await this.initExternalHooks();
await this.initExternalSecrets(); await this.initExternalSecrets();

View file

@ -256,7 +256,7 @@ export class Worker extends BaseCommand {
this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`); this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`);
this.logger.debug('Starting n8n worker...'); this.logger.debug('Starting n8n worker...');
await this.initLicense(); await this.initLicense('worker');
await this.initBinaryManager(); await this.initBinaryManager();
await this.initExternalHooks(); await this.initExternalHooks();
await this.initExternalSecrets(); await this.initExternalSecrets();
@ -268,6 +268,7 @@ export class Worker extends BaseCommand {
async initEventBus() { async initEventBus() {
await eventBus.initialize({ await eventBus.initialize({
workerId: this.uniqueInstanceId, workerId: this.uniqueInstanceId,
uniqueInstanceId: this.uniqueInstanceId,
}); });
} }
@ -295,6 +296,7 @@ export class Worker extends BaseCommand {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({ getWorkerCommandReceivedHandler({
uniqueInstanceId: this.uniqueInstanceId, uniqueInstanceId: this.uniqueInstanceId,
instanceId: this.instanceId,
redisPublisher: this.redisPublisher, redisPublisher: this.redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs), getRunningJobIds: () => Object.keys(Worker.runningJobs),
}), }),

View file

@ -109,7 +109,7 @@ export class E2EController {
private async resetLogStreaming() { private async resetLogStreaming() {
for (const id in eventBus.destinations) { for (const id in eventBus.destinations) {
await eventBus.removeDestination(id); await eventBus.removeDestination(id, false);
} }
} }

View file

@ -1,15 +1,12 @@
import config from '@/config';
import { Authorized, Get, RestController } from '@/decorators'; import { Authorized, Get, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests'; import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { OrchestrationService } from '../services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
@Authorized(['global', 'owner']) @Authorized(['global', 'owner'])
@RestController('/orchestration') @RestController('/orchestration')
@Service() @Service()
export class OrchestrationController { export class OrchestrationController {
private config = config;
constructor(private readonly orchestrationService: OrchestrationService) {} constructor(private readonly orchestrationService: OrchestrationService) {}
/** /**

View file

@ -1,4 +1,4 @@
import { LoggerProxy } from 'n8n-workflow'; import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm'; import type { DeleteResult } from 'typeorm';
import type { import type {
@ -27,9 +27,18 @@ import {
} from '../EventMessageClasses/EventMessageGeneric'; } from '../EventMessageClasses/EventMessageGeneric';
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container from 'typedi'; import Container, { Service } from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { OrchestrationService } from '../../services/orchestration.service'; import { RedisService } from '@/services/redis.service';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
} from '@/services/redis/RedisServiceHelper';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -41,13 +50,21 @@ export interface MessageWithCallback {
export interface MessageEventBusInitializeOptions { export interface MessageEventBusInitializeOptions {
skipRecoveryPass?: boolean; skipRecoveryPass?: boolean;
workerId?: string; workerId?: string;
uniqueInstanceId?: string;
} }
@Service()
export class MessageEventBus extends EventEmitter { export class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus; private static instance: MessageEventBus;
isInitialized: boolean; isInitialized: boolean;
uniqueInstanceId: string;
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber;
logWriter: MessageEventBusLogWriter; logWriter: MessageEventBusLogWriter;
destinations: { destinations: {
@ -76,11 +93,30 @@ export class MessageEventBus extends EventEmitter {
* *
* Sets `isInitialized` to `true` once finished. * Sets `isInitialized` to `true` once finished.
*/ */
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> { async initialize(options: MessageEventBusInitializeOptions): Promise<void> {
if (this.isInitialized) { if (this.isInitialized) {
return; return;
} }
this.uniqueInstanceId = options?.uniqueInstanceId ?? '';
if (config.getEnv('executions.mode') === 'queue') {
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
await this.redisSubscriber.subscribeToEventLog();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'MessageEventBusMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleRedisEventBusMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await this.handleRedisCommandMessage(messageString);
}
},
);
}
LoggerProxy.debug('Initializing event bus...'); LoggerProxy.debug('Initializing event bus...');
const savedEventDestinations = await Db.collections.EventDestinations.find({}); const savedEventDestinations = await Db.collections.EventDestinations.find({});
@ -89,7 +125,7 @@ export class MessageEventBus extends EventEmitter {
try { try {
const destination = messageEventBusDestinationFromDb(this, destinationData); const destination = messageEventBusDestinationFromDb(this, destinationData);
if (destination) { if (destination) {
await this.addDestination(destination); await this.addDestination(destination, false);
} }
} catch (error) { } catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
@ -182,10 +218,13 @@ export class MessageEventBus extends EventEmitter {
this.isInitialized = true; this.isInitialized = true;
} }
async addDestination(destination: MessageEventBusDestination) { async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) {
await this.removeDestination(destination.getId()); await this.removeDestination(destination.getId(), false);
this.destinations[destination.getId()] = destination; this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening(); this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
}
return destination; return destination;
} }
@ -199,19 +238,62 @@ export class MessageEventBus extends EventEmitter {
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
} }
async removeDestination(id: string): Promise<DeleteResult | undefined> { async removeDestination(
id: string,
notifyWorkers: boolean = true,
): Promise<DeleteResult | undefined> {
let result; let result;
if (Object.keys(this.destinations).includes(id)) { if (Object.keys(this.destinations).includes(id)) {
await this.destinations[id].close(); await this.destinations[id].close();
result = await this.destinations[id].deleteFromDb(); result = await this.destinations[id].deleteFromDb();
delete this.destinations[id]; delete this.destinations[id];
} }
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
}
return result; return result;
} }
async handleRedisEventBusMessage(messageString: string) {
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await Container.get(MessageEventBus).send(eventMessage);
}
}
return eventData;
}
async handleRedisCommandMessage(messageString: string) {
const message = messageToRedisServiceCommandObject(messageString);
if (message) {
if (
message.senderId === this.uniqueInstanceId ||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'restartEventBus':
await this.restart();
default:
break;
}
return message;
}
return;
}
async broadcastRestartEventbusAfterDestinationUpdate() { async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') { if (config.getEnv('executions.mode') === 'queue') {
await Container.get(OrchestrationService).restartEventBus(); await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'restartEventBus',
});
} }
} }
@ -235,6 +317,8 @@ export class MessageEventBus extends EventEmitter {
); );
await this.destinations[destinationName].close(); await this.destinations[destinationName].close();
} }
await this.redisSubscriber?.unSubscribeFromCommandChannel();
await this.redisSubscriber?.unSubscribeFromEventLog();
this.isInitialized = false; this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.'); LoggerProxy.debug('EventBus shut down.');
} }
@ -417,4 +501,4 @@ export class MessageEventBus extends EventEmitter {
} }
} }
export const eventBus = MessageEventBus.getInstance(); export const eventBus = Container.get(MessageEventBus);

View file

@ -2,19 +2,9 @@ import { Service } from 'typedi';
import { RedisService } from './redis.service'; import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import { LoggerProxy, jsonParse } from 'n8n-workflow'; import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper';
import { eventBus } from '../eventbus'; import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions'; import { handleCommandMessage } from './orchestration/handleCommandMessage';
import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from './redis/RedisServiceCommands';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './redis/RedisServiceHelper';
@Service() @Service()
export class OrchestrationService { export class OrchestrationService {
@ -51,81 +41,21 @@ export class OrchestrationService {
private async initSubscriber() { private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber(); this.redisSubscriber = await this.redisService.getPubSubSubscriber();
// TODO: these are all proof of concept implementations for the moment
// until worker communication is implemented
// #region proof of concept
await this.redisSubscriber.subscribeToEventLog();
await this.redisSubscriber.subscribeToWorkerResponseChannel(); await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel(); await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler( this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver', 'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => { async (channel: string, messageString: string) => {
// TODO: this is a proof of concept implementation to forward events to the main instance's event bus if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
// Events are arriving through a pub/sub channel and are forwarded to the eventBus await handleWorkerResponseMessage(messageString);
// In the future, a stream should probably replace this implementation entirely
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleEventBusMessage(messageString);
} else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await this.handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) { } else if (channel === COMMAND_REDIS_CHANNEL) {
await this.handleCommandMessage(messageString); await handleCommandMessage(messageString, this.uniqueInstanceId);
} }
}, },
); );
} }
async handleWorkerResponseMessage(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse);
}
return workerResponse;
}
async handleEventBusMessage(messageString: string) {
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await eventBus.send(eventMessage);
}
}
return eventData;
}
async handleCommandMessage(messageString: string) {
if (!messageString) return;
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
LoggerProxy.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
if (
message.senderId === this.uniqueInstanceId ||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'restartEventBus':
await eventBus.restart();
break;
}
return message;
}
return;
}
async getWorkerStatus(id?: string) { async getWorkerStatus(id?: string) {
if (!this.initialized) { if (!this.initialized) {
throw new Error('OrchestrationService not initialized'); throw new Error('OrchestrationService not initialized');
@ -159,13 +89,14 @@ export class OrchestrationService {
}); });
} }
async restartEventBus(id?: string) { // reload the license on workers after it was changed on the main instance
async reloadLicense(id?: string) {
if (!this.initialized) { if (!this.initialized) {
throw new Error('OrchestrationService not initialized'); throw new Error('OrchestrationService not initialized');
} }
await this.redisPublisher.publishToCommandChannel({ await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId, senderId: this.uniqueInstanceId,
command: 'restartEventBus', command: 'reloadLicense',
targets: id ? [id] : undefined, targets: id ? [id] : undefined,
}); });
} }

View file

@ -0,0 +1,29 @@
import { LoggerProxy } from 'n8n-workflow';
import { messageToRedisServiceCommandObject } from './helpers';
import Container from 'typedi';
import { License } from '@/License';
// this function handles commands sent to the MAIN instance. the workers handle their own commands
export async function handleCommandMessage(messageString: string, uniqueInstanceId: string) {
const message = messageToRedisServiceCommandObject(messageString);
if (message) {
if (
message.senderId === uniqueInstanceId ||
(message.targets && !message.targets.includes(uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'reloadLicense':
await Container.get(License).reload();
break;
default:
break;
}
return message;
}
return;
}

View file

@ -0,0 +1,11 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands';
export async function handleWorkerResponseMessage(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse);
}
return workerResponse;
}

View file

@ -0,0 +1,17 @@
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
export function messageToRedisServiceCommandObject(messageString: string) {
if (!messageString) return;
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
LoggerProxy.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}
return message;
}

View file

@ -49,6 +49,7 @@ class RedisServiceBase {
return; return;
} }
await this.redisClient.quit(); await this.redisClient.quit();
this.isInitialized = false;
this.redisClient = undefined; this.redisClient = undefined;
} }
} }

View file

@ -1,4 +1,9 @@
export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands export type RedisServiceCommand =
| 'getStatus'
| 'getId'
| 'restartEventBus'
| 'stopWorker'
| 'reloadLicense';
/** /**
* An object to be sent via Redis pub/sub from the main process to the workers. * An object to be sent via Redis pub/sub from the main process to the workers.
@ -7,7 +12,7 @@ export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 's
* @field payload: Optional arguments to be sent with the command. * @field payload: Optional arguments to be sent with the command.
*/ */
type RedisServiceBaseCommand = { type RedisServiceBaseCommand = {
senderId: string; senderId?: string;
command: RedisServiceCommand; command: RedisServiceCommand;
payload?: { payload?: {
[key: string]: string | number | boolean | string[] | number[] | boolean[]; [key: string]: string | number | boolean | string[] | number[] | boolean[];

View file

@ -32,6 +32,19 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
}); });
} }
async unsubscribe(channel: string): Promise<void> {
if (!this.redisClient) {
return;
}
await this.redisClient?.unsubscribe(channel, (error, _count: number) => {
if (error) {
Logger.error(`Error unsubscribing from channel ${channel}`);
} else {
Logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`);
}
});
}
async subscribeToEventLog(): Promise<void> { async subscribeToEventLog(): Promise<void> {
await this.subscribe(EVENT_BUS_REDIS_CHANNEL); await this.subscribe(EVENT_BUS_REDIS_CHANNEL);
} }
@ -43,4 +56,16 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
async subscribeToWorkerResponseChannel(): Promise<void> { async subscribeToWorkerResponseChannel(): Promise<void> {
await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL);
} }
async unSubscribeFromEventLog(): Promise<void> {
await this.unsubscribe(EVENT_BUS_REDIS_CHANNEL);
}
async unSubscribeFromCommandChannel(): Promise<void> {
await this.unsubscribe(COMMAND_REDIS_CHANNEL);
}
async unSubscribeFromWorkerResponseChannel(): Promise<void> {
await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL);
}
} }

View file

@ -1,12 +1,14 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow'; import { jsonParse, LoggerProxy } from 'n8n-workflow';
import { eventBus } from '../eventbus';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import * as os from 'os'; import * as os from 'os';
import Container from 'typedi';
import { License } from '@/License';
export function getWorkerCommandReceivedHandler(options: { export function getWorkerCommandReceivedHandler(options: {
uniqueInstanceId: string; uniqueInstanceId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher; redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[]; getRunningJobIds: () => string[];
}) { }) {
@ -56,7 +58,6 @@ export function getWorkerCommandReceivedHandler(options: {
}); });
break; break;
case 'restartEventBus': case 'restartEventBus':
await eventBus.restart();
await options.redisPublisher.publishToWorkerChannel({ await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId, workerId: options.uniqueInstanceId,
command: message.command, command: message.command,
@ -65,6 +66,9 @@ export function getWorkerCommandReceivedHandler(options: {
}, },
}); });
break; break;
case 'reloadLicense':
await Container.get(License).reload();
break;
case 'stopWorker': case 'stopWorker':
// TODO: implement proper shutdown // TODO: implement proper shutdown
// await this.stopProcess(); // await this.stopProcess();

View file

@ -91,7 +91,9 @@ beforeAll(async () => {
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
config.set('eventBus.logWriter.keepLogCount', 1); config.set('eventBus.logWriter.keepLogCount', 1);
await eventBus.initialize(); await eventBus.initialize({
uniqueInstanceId: 'test',
});
}); });
afterAll(async () => { afterAll(async () => {

View file

@ -31,6 +31,24 @@ describe('License', () => {
expect(LicenseManager).toHaveBeenCalledWith({ expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: true, autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET, autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger: expect.anything(),
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
});
test('initializes license manager for worker', async () => {
license = new License();
await license.init(MOCK_INSTANCE_ID, 'worker');
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
deviceFingerprint: expect.any(Function), deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`, productIdentifier: `n8n-${N8N_VERSION}`,
logger: expect.anything(), logger: expect.anything(),

View file

@ -6,9 +6,11 @@ import { OrchestrationService } from '@/services/orchestration.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
import { eventBus } from '@/eventbus'; import { eventBus } from '@/eventbus';
import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers';
import { RedisService } from '@/services/redis.service'; import { RedisService } from '@/services/redis.service';
import { mockInstance } from '../../integration/shared/utils'; import { mockInstance } from '../../integration/shared/utils';
import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage';
import { License } from '../../../src/License';
const os = Container.get(OrchestrationService); const os = Container.get(OrchestrationService);
@ -77,6 +79,7 @@ describe('Orchestration Service', () => {
afterAll(async () => { afterAll(async () => {
jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks();
jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks();
await os.shutdown();
}); });
test('should initialize', async () => { test('should initialize', async () => {
@ -87,38 +90,35 @@ describe('Orchestration Service', () => {
}); });
test('should handle worker responses', async () => { test('should handle worker responses', async () => {
const response = await os.handleWorkerResponseMessage( const response = await handleWorkerResponseMessage(
JSON.stringify(workerRestartEventbusResponse), JSON.stringify(workerRestartEventbusResponse),
); );
expect(response.command).toEqual('restartEventBus'); expect(response.command).toEqual('restartEventBus');
}); });
test('should handle event messages', async () => {
const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage));
jest.spyOn(eventBus, 'send');
jest.spyOn(EventHelpers, 'getEventMessageObjectByType');
expect(eventBus.send).toHaveBeenCalled();
expect(response.eventName).toEqual('n8n.workflow.success');
jest.spyOn(eventBus, 'send').mockRestore();
jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore();
});
test('should handle command messages from others', async () => { test('should handle command messages from others', async () => {
jest.spyOn(eventBus, 'restart'); const license = Container.get(License);
const responseFalseId = await os.handleCommandMessage( license.instanceId = 'test';
JSON.stringify(workerRestartEventbusResponse), jest.spyOn(license, 'reload');
const responseFalseId = await handleCommandMessage(
JSON.stringify({
senderId: 'test',
command: 'reloadLicense',
}),
os.uniqueInstanceId,
); );
expect(responseFalseId).toBeDefined(); expect(responseFalseId).toBeDefined();
expect(responseFalseId!.command).toEqual('restartEventBus'); expect(responseFalseId!.command).toEqual('reloadLicense');
expect(responseFalseId!.senderId).toEqual('test'); expect(responseFalseId!.senderId).toEqual('test');
expect(eventBus.restart).toHaveBeenCalled(); expect(license.reload).toHaveBeenCalled();
jest.spyOn(eventBus, 'restart').mockRestore(); jest.spyOn(license, 'reload').mockRestore();
}); });
test('should reject command messages from iteslf', async () => { test('should reject command messages from iteslf', async () => {
jest.spyOn(eventBus, 'restart'); jest.spyOn(eventBus, 'restart');
const response = await os.handleCommandMessage( const response = await handleCommandMessage(
JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }),
os.uniqueInstanceId,
); );
expect(response).toBeDefined(); expect(response).toBeDefined();
expect(response!.command).toEqual('restartEventBus'); expect(response!.command).toEqual('restartEventBus');
@ -133,8 +133,4 @@ describe('Orchestration Service', () => {
expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled();
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore();
}); });
afterAll(async () => {
await os.shutdown();
});
}); });

View file

@ -195,8 +195,8 @@ importers:
specifier: workspace:* specifier: workspace:*
version: link:../@n8n/client-oauth2 version: link:../@n8n/client-oauth2
'@n8n_io/license-sdk': '@n8n_io/license-sdk':
specifier: ~2.5.1 specifier: ~2.6.0
version: 2.5.1 version: 2.6.0
'@oclif/command': '@oclif/command':
specifier: ^1.8.16 specifier: ^1.8.16
version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1)
@ -4656,8 +4656,8 @@ packages:
acorn-walk: 8.2.0 acorn-walk: 8.2.0
dev: false dev: false
/@n8n_io/license-sdk@2.5.1: /@n8n_io/license-sdk@2.6.0:
resolution: {integrity: sha512-CL4JVJS8nvI8qPFQ1jSG7CiPnNkeKJSgbDxWOLVX4MRjTKrwL8Cpd1LeYMx5g5StmHzkoxz2TDqL8WT6qyMlrQ==} resolution: {integrity: sha512-jPUn8xKAZMWgFw8w6BwqbdlZ1Et4tZcPUdOfEzxpWxEmgtCEAdbl3V0ygP3pTXyWY0hblvv8QzbHOUrK25hQSA==}
engines: {node: '>=14.0.0', npm: '>=7.10.0'} engines: {node: '>=14.0.0', npm: '>=7.10.0'}
dependencies: dependencies:
crypto-js: 4.1.1 crypto-js: 4.1.1