mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 04:04:06 -08:00
feat(core): Add command to trigger license refresh on workers (#7184)
This PR implements the updated license SDK so that worker and webhook instances do not auto-renew licenses any more. Instead, they receive a `reloadLicense` command via the Redis client that will fetch the updated license after it was saved on the main instance This also contains some refactoring with moving redis sub and pub clients into the event bus directly, to prevent cyclic dependency issues.
This commit is contained in:
parent
d317e09c59
commit
9f797b96d8
|
@ -100,7 +100,7 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"@n8n/client-oauth2": "workspace:*",
|
||||
"@n8n_io/license-sdk": "~2.5.1",
|
||||
"@n8n_io/license-sdk": "~2.6.0",
|
||||
"@oclif/command": "^1.8.16",
|
||||
"@oclif/core": "^1.16.4",
|
||||
"@oclif/errors": "^1.3.6",
|
||||
|
|
|
@ -11,8 +11,10 @@ import {
|
|||
SETTINGS_LICENSE_CERT_KEY,
|
||||
UNLIMITED_LICENSE_QUOTA,
|
||||
} from './constants';
|
||||
import { Service } from 'typedi';
|
||||
import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces';
|
||||
import Container, { Service } from 'typedi';
|
||||
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
|
||||
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
|
||||
import { RedisService } from './services/redis.service';
|
||||
|
||||
type FeatureReturnType = Partial<
|
||||
{
|
||||
|
@ -26,18 +28,28 @@ export class License {
|
|||
|
||||
private manager: LicenseManager | undefined;
|
||||
|
||||
instanceId: string | undefined;
|
||||
|
||||
private redisPublisher: RedisServicePubSubPublisher;
|
||||
|
||||
constructor() {
|
||||
this.logger = getLogger();
|
||||
}
|
||||
|
||||
async init(instanceId: string) {
|
||||
async init(instanceId: string, instanceType: N8nInstanceType = 'main') {
|
||||
if (this.manager) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.instanceId = instanceId;
|
||||
const isMainInstance = instanceType === 'main';
|
||||
const server = config.getEnv('license.serverUrl');
|
||||
const autoRenewEnabled = config.getEnv('license.autoRenewEnabled');
|
||||
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
|
||||
const offlineMode = !isMainInstance;
|
||||
const autoRenewOffset = config.getEnv('license.autoRenewOffset');
|
||||
const saveCertStr = isMainInstance
|
||||
? async (value: TLicenseBlock) => this.saveCertStr(value)
|
||||
: async () => {};
|
||||
|
||||
try {
|
||||
this.manager = new LicenseManager({
|
||||
|
@ -46,9 +58,10 @@ export class License {
|
|||
productIdentifier: `n8n-${N8N_VERSION}`,
|
||||
autoRenewEnabled,
|
||||
autoRenewOffset,
|
||||
offlineMode,
|
||||
logger: this.logger,
|
||||
loadCertStr: async () => this.loadCertStr(),
|
||||
saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value),
|
||||
saveCertStr,
|
||||
deviceFingerprint: () => instanceId,
|
||||
});
|
||||
|
||||
|
@ -86,6 +99,15 @@ export class License {
|
|||
},
|
||||
['key'],
|
||||
);
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
if (!this.redisPublisher) {
|
||||
this.logger.debug('Initializing Redis publisher for License Service');
|
||||
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
|
||||
}
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'reloadLicense',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async activate(activationKey: string): Promise<void> {
|
||||
|
@ -96,6 +118,14 @@ export class License {
|
|||
await this.manager.activate(activationKey);
|
||||
}
|
||||
|
||||
async reload(): Promise<void> {
|
||||
if (!this.manager) {
|
||||
return;
|
||||
}
|
||||
this.logger.debug('Reloading license');
|
||||
await this.manager.reload();
|
||||
}
|
||||
|
||||
async renew() {
|
||||
if (!this.manager) {
|
||||
return;
|
||||
|
|
|
@ -1470,7 +1470,9 @@ export class Server extends AbstractServer {
|
|||
// ----------------------------------------
|
||||
|
||||
if (!eventBus.isInitialized) {
|
||||
await eventBus.initialize();
|
||||
await eventBus.initialize({
|
||||
uniqueInstanceId: this.uniqueInstanceId,
|
||||
});
|
||||
}
|
||||
|
||||
if (this.endpointPresetCredentials !== '') {
|
||||
|
|
|
@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting';
|
|||
import { ExternalHooks } from '@/ExternalHooks';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import type { IExternalHooksClass } from '@/Interfaces';
|
||||
import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces';
|
||||
import { InternalHooks } from '@/InternalHooks';
|
||||
import { PostHogClient } from '@/posthog';
|
||||
import { License } from '@/License';
|
||||
|
@ -113,9 +113,9 @@ export abstract class BaseCommand extends Command {
|
|||
await this.externalHooks.init();
|
||||
}
|
||||
|
||||
async initLicense(): Promise<void> {
|
||||
async initLicense(instanceType: N8nInstanceType = 'main'): Promise<void> {
|
||||
const license = Container.get(License);
|
||||
await license.init(this.instanceId);
|
||||
await license.init(this.instanceId, instanceType);
|
||||
|
||||
const activationKey = config.getEnv('license.activationKey');
|
||||
|
||||
|
|
|
@ -197,7 +197,7 @@ export class Start extends BaseCommand {
|
|||
this.logger.info('Initializing n8n process');
|
||||
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
|
||||
|
||||
await this.initLicense();
|
||||
await this.initLicense('main');
|
||||
await this.initBinaryManager();
|
||||
await this.initExternalHooks();
|
||||
await this.initExternalSecrets();
|
||||
|
|
|
@ -77,7 +77,7 @@ export class Webhook extends BaseCommand {
|
|||
await this.initCrashJournal();
|
||||
await super.init();
|
||||
|
||||
await this.initLicense();
|
||||
await this.initLicense('webhook');
|
||||
await this.initBinaryManager();
|
||||
await this.initExternalHooks();
|
||||
await this.initExternalSecrets();
|
||||
|
|
|
@ -256,7 +256,7 @@ export class Worker extends BaseCommand {
|
|||
this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`);
|
||||
this.logger.debug('Starting n8n worker...');
|
||||
|
||||
await this.initLicense();
|
||||
await this.initLicense('worker');
|
||||
await this.initBinaryManager();
|
||||
await this.initExternalHooks();
|
||||
await this.initExternalSecrets();
|
||||
|
@ -268,6 +268,7 @@ export class Worker extends BaseCommand {
|
|||
async initEventBus() {
|
||||
await eventBus.initialize({
|
||||
workerId: this.uniqueInstanceId,
|
||||
uniqueInstanceId: this.uniqueInstanceId,
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -295,6 +296,7 @@ export class Worker extends BaseCommand {
|
|||
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||
getWorkerCommandReceivedHandler({
|
||||
uniqueInstanceId: this.uniqueInstanceId,
|
||||
instanceId: this.instanceId,
|
||||
redisPublisher: this.redisPublisher,
|
||||
getRunningJobIds: () => Object.keys(Worker.runningJobs),
|
||||
}),
|
||||
|
|
|
@ -109,7 +109,7 @@ export class E2EController {
|
|||
|
||||
private async resetLogStreaming() {
|
||||
for (const id in eventBus.destinations) {
|
||||
await eventBus.removeDestination(id);
|
||||
await eventBus.removeDestination(id, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
import config from '@/config';
|
||||
import { Authorized, Get, RestController } from '@/decorators';
|
||||
import { OrchestrationRequest } from '@/requests';
|
||||
import { Service } from 'typedi';
|
||||
import { OrchestrationService } from '../services/orchestration.service';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
|
||||
@Authorized(['global', 'owner'])
|
||||
@RestController('/orchestration')
|
||||
@Service()
|
||||
export class OrchestrationController {
|
||||
private config = config;
|
||||
|
||||
constructor(private readonly orchestrationService: OrchestrationService) {}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { LoggerProxy } from 'n8n-workflow';
|
||||
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
||||
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
|
||||
import type { DeleteResult } from 'typeorm';
|
||||
import type {
|
||||
|
@ -27,9 +27,18 @@ import {
|
|||
} from '../EventMessageClasses/EventMessageGeneric';
|
||||
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
|
||||
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
|
||||
import Container from 'typedi';
|
||||
import Container, { Service } from 'typedi';
|
||||
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
|
||||
import { OrchestrationService } from '../../services/orchestration.service';
|
||||
import { RedisService } from '@/services/redis.service';
|
||||
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
||||
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
|
||||
import {
|
||||
COMMAND_REDIS_CHANNEL,
|
||||
EVENT_BUS_REDIS_CHANNEL,
|
||||
} from '@/services/redis/RedisServiceHelper';
|
||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers';
|
||||
|
||||
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
|
||||
|
||||
|
@ -41,13 +50,21 @@ export interface MessageWithCallback {
|
|||
export interface MessageEventBusInitializeOptions {
|
||||
skipRecoveryPass?: boolean;
|
||||
workerId?: string;
|
||||
uniqueInstanceId?: string;
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class MessageEventBus extends EventEmitter {
|
||||
private static instance: MessageEventBus;
|
||||
|
||||
isInitialized: boolean;
|
||||
|
||||
uniqueInstanceId: string;
|
||||
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
|
||||
redisSubscriber: RedisServicePubSubSubscriber;
|
||||
|
||||
logWriter: MessageEventBusLogWriter;
|
||||
|
||||
destinations: {
|
||||
|
@ -76,11 +93,30 @@ export class MessageEventBus extends EventEmitter {
|
|||
*
|
||||
* Sets `isInitialized` to `true` once finished.
|
||||
*/
|
||||
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> {
|
||||
async initialize(options: MessageEventBusInitializeOptions): Promise<void> {
|
||||
if (this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.uniqueInstanceId = options?.uniqueInstanceId ?? '';
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
|
||||
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
|
||||
await this.redisSubscriber.subscribeToEventLog();
|
||||
await this.redisSubscriber.subscribeToCommandChannel();
|
||||
this.redisSubscriber.addMessageHandler(
|
||||
'MessageEventBusMessageReceiver',
|
||||
async (channel: string, messageString: string) => {
|
||||
if (channel === EVENT_BUS_REDIS_CHANNEL) {
|
||||
await this.handleRedisEventBusMessage(messageString);
|
||||
} else if (channel === COMMAND_REDIS_CHANNEL) {
|
||||
await this.handleRedisCommandMessage(messageString);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
LoggerProxy.debug('Initializing event bus...');
|
||||
|
||||
const savedEventDestinations = await Db.collections.EventDestinations.find({});
|
||||
|
@ -89,7 +125,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
try {
|
||||
const destination = messageEventBusDestinationFromDb(this, destinationData);
|
||||
if (destination) {
|
||||
await this.addDestination(destination);
|
||||
await this.addDestination(destination, false);
|
||||
}
|
||||
} catch (error) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
|
@ -182,10 +218,13 @@ export class MessageEventBus extends EventEmitter {
|
|||
this.isInitialized = true;
|
||||
}
|
||||
|
||||
async addDestination(destination: MessageEventBusDestination) {
|
||||
await this.removeDestination(destination.getId());
|
||||
async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) {
|
||||
await this.removeDestination(destination.getId(), false);
|
||||
this.destinations[destination.getId()] = destination;
|
||||
this.destinations[destination.getId()].startListening();
|
||||
if (notifyWorkers) {
|
||||
await this.broadcastRestartEventbusAfterDestinationUpdate();
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
|
||||
|
@ -199,19 +238,62 @@ export class MessageEventBus extends EventEmitter {
|
|||
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
|
||||
}
|
||||
|
||||
async removeDestination(id: string): Promise<DeleteResult | undefined> {
|
||||
async removeDestination(
|
||||
id: string,
|
||||
notifyWorkers: boolean = true,
|
||||
): Promise<DeleteResult | undefined> {
|
||||
let result;
|
||||
if (Object.keys(this.destinations).includes(id)) {
|
||||
await this.destinations[id].close();
|
||||
result = await this.destinations[id].deleteFromDb();
|
||||
delete this.destinations[id];
|
||||
}
|
||||
if (notifyWorkers) {
|
||||
await this.broadcastRestartEventbusAfterDestinationUpdate();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async handleRedisEventBusMessage(messageString: string) {
|
||||
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
|
||||
if (eventData) {
|
||||
const eventMessage = getEventMessageObjectByType(eventData);
|
||||
if (eventMessage) {
|
||||
await Container.get(MessageEventBus).send(eventMessage);
|
||||
}
|
||||
}
|
||||
return eventData;
|
||||
}
|
||||
|
||||
async handleRedisCommandMessage(messageString: string) {
|
||||
const message = messageToRedisServiceCommandObject(messageString);
|
||||
if (message) {
|
||||
if (
|
||||
message.senderId === this.uniqueInstanceId ||
|
||||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
|
||||
) {
|
||||
LoggerProxy.debug(
|
||||
`Skipping command message ${message.command} because it's not for this instance.`,
|
||||
);
|
||||
return message;
|
||||
}
|
||||
switch (message.command) {
|
||||
case 'restartEventBus':
|
||||
await this.restart();
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return message;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
async broadcastRestartEventbusAfterDestinationUpdate() {
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
await Container.get(OrchestrationService).restartEventBus();
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
senderId: this.uniqueInstanceId,
|
||||
command: 'restartEventBus',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -235,6 +317,8 @@ export class MessageEventBus extends EventEmitter {
|
|||
);
|
||||
await this.destinations[destinationName].close();
|
||||
}
|
||||
await this.redisSubscriber?.unSubscribeFromCommandChannel();
|
||||
await this.redisSubscriber?.unSubscribeFromEventLog();
|
||||
this.isInitialized = false;
|
||||
LoggerProxy.debug('EventBus shut down.');
|
||||
}
|
||||
|
@ -417,4 +501,4 @@ export class MessageEventBus extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
export const eventBus = MessageEventBus.getInstance();
|
||||
export const eventBus = Container.get(MessageEventBus);
|
||||
|
|
|
@ -2,19 +2,9 @@ import { Service } from 'typedi';
|
|||
import { RedisService } from './redis.service';
|
||||
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
|
||||
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
|
||||
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
||||
import { eventBus } from '../eventbus';
|
||||
import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers';
|
||||
import type {
|
||||
RedisServiceCommandObject,
|
||||
RedisServiceWorkerResponseObject,
|
||||
} from './redis/RedisServiceCommands';
|
||||
import {
|
||||
COMMAND_REDIS_CHANNEL,
|
||||
EVENT_BUS_REDIS_CHANNEL,
|
||||
WORKER_RESPONSE_REDIS_CHANNEL,
|
||||
} from './redis/RedisServiceHelper';
|
||||
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper';
|
||||
import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
|
||||
import { handleCommandMessage } from './orchestration/handleCommandMessage';
|
||||
|
||||
@Service()
|
||||
export class OrchestrationService {
|
||||
|
@ -51,81 +41,21 @@ export class OrchestrationService {
|
|||
private async initSubscriber() {
|
||||
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
|
||||
|
||||
// TODO: these are all proof of concept implementations for the moment
|
||||
// until worker communication is implemented
|
||||
// #region proof of concept
|
||||
await this.redisSubscriber.subscribeToEventLog();
|
||||
await this.redisSubscriber.subscribeToWorkerResponseChannel();
|
||||
await this.redisSubscriber.subscribeToCommandChannel();
|
||||
|
||||
this.redisSubscriber.addMessageHandler(
|
||||
'OrchestrationMessageReceiver',
|
||||
async (channel: string, messageString: string) => {
|
||||
// TODO: this is a proof of concept implementation to forward events to the main instance's event bus
|
||||
// Events are arriving through a pub/sub channel and are forwarded to the eventBus
|
||||
// In the future, a stream should probably replace this implementation entirely
|
||||
if (channel === EVENT_BUS_REDIS_CHANNEL) {
|
||||
await this.handleEventBusMessage(messageString);
|
||||
} else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
|
||||
await this.handleWorkerResponseMessage(messageString);
|
||||
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
|
||||
await handleWorkerResponseMessage(messageString);
|
||||
} else if (channel === COMMAND_REDIS_CHANNEL) {
|
||||
await this.handleCommandMessage(messageString);
|
||||
await handleCommandMessage(messageString, this.uniqueInstanceId);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async handleWorkerResponseMessage(messageString: string) {
|
||||
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
|
||||
if (workerResponse) {
|
||||
// TODO: Handle worker response
|
||||
LoggerProxy.debug('Received worker response', workerResponse);
|
||||
}
|
||||
return workerResponse;
|
||||
}
|
||||
|
||||
async handleEventBusMessage(messageString: string) {
|
||||
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
|
||||
if (eventData) {
|
||||
const eventMessage = getEventMessageObjectByType(eventData);
|
||||
if (eventMessage) {
|
||||
await eventBus.send(eventMessage);
|
||||
}
|
||||
}
|
||||
return eventData;
|
||||
}
|
||||
|
||||
async handleCommandMessage(messageString: string) {
|
||||
if (!messageString) return;
|
||||
let message: RedisServiceCommandObject;
|
||||
try {
|
||||
message = jsonParse<RedisServiceCommandObject>(messageString);
|
||||
} catch {
|
||||
LoggerProxy.debug(
|
||||
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (message) {
|
||||
if (
|
||||
message.senderId === this.uniqueInstanceId ||
|
||||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
|
||||
) {
|
||||
LoggerProxy.debug(
|
||||
`Skipping command message ${message.command} because it's not for this instance.`,
|
||||
);
|
||||
return message;
|
||||
}
|
||||
switch (message.command) {
|
||||
case 'restartEventBus':
|
||||
await eventBus.restart();
|
||||
break;
|
||||
}
|
||||
return message;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
async getWorkerStatus(id?: string) {
|
||||
if (!this.initialized) {
|
||||
throw new Error('OrchestrationService not initialized');
|
||||
|
@ -159,13 +89,14 @@ export class OrchestrationService {
|
|||
});
|
||||
}
|
||||
|
||||
async restartEventBus(id?: string) {
|
||||
// reload the license on workers after it was changed on the main instance
|
||||
async reloadLicense(id?: string) {
|
||||
if (!this.initialized) {
|
||||
throw new Error('OrchestrationService not initialized');
|
||||
}
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
senderId: this.uniqueInstanceId,
|
||||
command: 'restartEventBus',
|
||||
command: 'reloadLicense',
|
||||
targets: id ? [id] : undefined,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
import { LoggerProxy } from 'n8n-workflow';
|
||||
import { messageToRedisServiceCommandObject } from './helpers';
|
||||
import Container from 'typedi';
|
||||
import { License } from '@/License';
|
||||
|
||||
// this function handles commands sent to the MAIN instance. the workers handle their own commands
|
||||
export async function handleCommandMessage(messageString: string, uniqueInstanceId: string) {
|
||||
const message = messageToRedisServiceCommandObject(messageString);
|
||||
if (message) {
|
||||
if (
|
||||
message.senderId === uniqueInstanceId ||
|
||||
(message.targets && !message.targets.includes(uniqueInstanceId))
|
||||
) {
|
||||
LoggerProxy.debug(
|
||||
`Skipping command message ${message.command} because it's not for this instance.`,
|
||||
);
|
||||
return message;
|
||||
}
|
||||
switch (message.command) {
|
||||
case 'reloadLicense':
|
||||
await Container.get(License).reload();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return message;
|
||||
}
|
||||
return;
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
||||
import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands';
|
||||
|
||||
export async function handleWorkerResponseMessage(messageString: string) {
|
||||
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
|
||||
if (workerResponse) {
|
||||
// TODO: Handle worker response
|
||||
LoggerProxy.debug('Received worker response', workerResponse);
|
||||
}
|
||||
return workerResponse;
|
||||
}
|
17
packages/cli/src/services/orchestration/helpers.ts
Normal file
17
packages/cli/src/services/orchestration/helpers.ts
Normal file
|
@ -0,0 +1,17 @@
|
|||
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
||||
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
|
||||
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
|
||||
|
||||
export function messageToRedisServiceCommandObject(messageString: string) {
|
||||
if (!messageString) return;
|
||||
let message: RedisServiceCommandObject;
|
||||
try {
|
||||
message = jsonParse<RedisServiceCommandObject>(messageString);
|
||||
} catch {
|
||||
LoggerProxy.debug(
|
||||
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
return message;
|
||||
}
|
|
@ -49,6 +49,7 @@ class RedisServiceBase {
|
|||
return;
|
||||
}
|
||||
await this.redisClient.quit();
|
||||
this.isInitialized = false;
|
||||
this.redisClient = undefined;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands
|
||||
export type RedisServiceCommand =
|
||||
| 'getStatus'
|
||||
| 'getId'
|
||||
| 'restartEventBus'
|
||||
| 'stopWorker'
|
||||
| 'reloadLicense';
|
||||
|
||||
/**
|
||||
* An object to be sent via Redis pub/sub from the main process to the workers.
|
||||
|
@ -7,7 +12,7 @@ export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 's
|
|||
* @field payload: Optional arguments to be sent with the command.
|
||||
*/
|
||||
type RedisServiceBaseCommand = {
|
||||
senderId: string;
|
||||
senderId?: string;
|
||||
command: RedisServiceCommand;
|
||||
payload?: {
|
||||
[key: string]: string | number | boolean | string[] | number[] | boolean[];
|
||||
|
|
|
@ -32,6 +32,19 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
|
|||
});
|
||||
}
|
||||
|
||||
async unsubscribe(channel: string): Promise<void> {
|
||||
if (!this.redisClient) {
|
||||
return;
|
||||
}
|
||||
await this.redisClient?.unsubscribe(channel, (error, _count: number) => {
|
||||
if (error) {
|
||||
Logger.error(`Error unsubscribing from channel ${channel}`);
|
||||
} else {
|
||||
Logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async subscribeToEventLog(): Promise<void> {
|
||||
await this.subscribe(EVENT_BUS_REDIS_CHANNEL);
|
||||
}
|
||||
|
@ -43,4 +56,16 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
|
|||
async subscribeToWorkerResponseChannel(): Promise<void> {
|
||||
await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL);
|
||||
}
|
||||
|
||||
async unSubscribeFromEventLog(): Promise<void> {
|
||||
await this.unsubscribe(EVENT_BUS_REDIS_CHANNEL);
|
||||
}
|
||||
|
||||
async unSubscribeFromCommandChannel(): Promise<void> {
|
||||
await this.unsubscribe(COMMAND_REDIS_CHANNEL);
|
||||
}
|
||||
|
||||
async unSubscribeFromWorkerResponseChannel(): Promise<void> {
|
||||
await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
||||
import { eventBus } from '../eventbus';
|
||||
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
|
||||
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
|
||||
import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher';
|
||||
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
||||
import * as os from 'os';
|
||||
import Container from 'typedi';
|
||||
import { License } from '@/License';
|
||||
|
||||
export function getWorkerCommandReceivedHandler(options: {
|
||||
uniqueInstanceId: string;
|
||||
instanceId: string;
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
getRunningJobIds: () => string[];
|
||||
}) {
|
||||
|
@ -56,7 +58,6 @@ export function getWorkerCommandReceivedHandler(options: {
|
|||
});
|
||||
break;
|
||||
case 'restartEventBus':
|
||||
await eventBus.restart();
|
||||
await options.redisPublisher.publishToWorkerChannel({
|
||||
workerId: options.uniqueInstanceId,
|
||||
command: message.command,
|
||||
|
@ -65,6 +66,9 @@ export function getWorkerCommandReceivedHandler(options: {
|
|||
},
|
||||
});
|
||||
break;
|
||||
case 'reloadLicense':
|
||||
await Container.get(License).reload();
|
||||
break;
|
||||
case 'stopWorker':
|
||||
// TODO: implement proper shutdown
|
||||
// await this.stopProcess();
|
||||
|
|
|
@ -91,7 +91,9 @@ beforeAll(async () => {
|
|||
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
|
||||
config.set('eventBus.logWriter.keepLogCount', 1);
|
||||
|
||||
await eventBus.initialize();
|
||||
await eventBus.initialize({
|
||||
uniqueInstanceId: 'test',
|
||||
});
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
|
|
|
@ -31,6 +31,24 @@ describe('License', () => {
|
|||
expect(LicenseManager).toHaveBeenCalledWith({
|
||||
autoRenewEnabled: true,
|
||||
autoRenewOffset: MOCK_RENEW_OFFSET,
|
||||
offlineMode: false,
|
||||
deviceFingerprint: expect.any(Function),
|
||||
productIdentifier: `n8n-${N8N_VERSION}`,
|
||||
logger: expect.anything(),
|
||||
loadCertStr: expect.any(Function),
|
||||
saveCertStr: expect.any(Function),
|
||||
server: MOCK_SERVER_URL,
|
||||
tenantId: 1,
|
||||
});
|
||||
});
|
||||
|
||||
test('initializes license manager for worker', async () => {
|
||||
license = new License();
|
||||
await license.init(MOCK_INSTANCE_ID, 'worker');
|
||||
expect(LicenseManager).toHaveBeenCalledWith({
|
||||
autoRenewEnabled: false,
|
||||
autoRenewOffset: MOCK_RENEW_OFFSET,
|
||||
offlineMode: true,
|
||||
deviceFingerprint: expect.any(Function),
|
||||
productIdentifier: `n8n-${N8N_VERSION}`,
|
||||
logger: expect.anything(),
|
||||
|
|
|
@ -6,9 +6,11 @@ import { OrchestrationService } from '@/services/orchestration.service';
|
|||
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
|
||||
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
|
||||
import { eventBus } from '@/eventbus';
|
||||
import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers';
|
||||
import { RedisService } from '@/services/redis.service';
|
||||
import { mockInstance } from '../../integration/shared/utils';
|
||||
import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage';
|
||||
import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage';
|
||||
import { License } from '../../../src/License';
|
||||
|
||||
const os = Container.get(OrchestrationService);
|
||||
|
||||
|
@ -77,6 +79,7 @@ describe('Orchestration Service', () => {
|
|||
afterAll(async () => {
|
||||
jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks();
|
||||
jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks();
|
||||
await os.shutdown();
|
||||
});
|
||||
|
||||
test('should initialize', async () => {
|
||||
|
@ -87,38 +90,35 @@ describe('Orchestration Service', () => {
|
|||
});
|
||||
|
||||
test('should handle worker responses', async () => {
|
||||
const response = await os.handleWorkerResponseMessage(
|
||||
const response = await handleWorkerResponseMessage(
|
||||
JSON.stringify(workerRestartEventbusResponse),
|
||||
);
|
||||
expect(response.command).toEqual('restartEventBus');
|
||||
});
|
||||
|
||||
test('should handle event messages', async () => {
|
||||
const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage));
|
||||
jest.spyOn(eventBus, 'send');
|
||||
jest.spyOn(EventHelpers, 'getEventMessageObjectByType');
|
||||
expect(eventBus.send).toHaveBeenCalled();
|
||||
expect(response.eventName).toEqual('n8n.workflow.success');
|
||||
jest.spyOn(eventBus, 'send').mockRestore();
|
||||
jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore();
|
||||
});
|
||||
|
||||
test('should handle command messages from others', async () => {
|
||||
jest.spyOn(eventBus, 'restart');
|
||||
const responseFalseId = await os.handleCommandMessage(
|
||||
JSON.stringify(workerRestartEventbusResponse),
|
||||
const license = Container.get(License);
|
||||
license.instanceId = 'test';
|
||||
jest.spyOn(license, 'reload');
|
||||
const responseFalseId = await handleCommandMessage(
|
||||
JSON.stringify({
|
||||
senderId: 'test',
|
||||
command: 'reloadLicense',
|
||||
}),
|
||||
os.uniqueInstanceId,
|
||||
);
|
||||
expect(responseFalseId).toBeDefined();
|
||||
expect(responseFalseId!.command).toEqual('restartEventBus');
|
||||
expect(responseFalseId!.command).toEqual('reloadLicense');
|
||||
expect(responseFalseId!.senderId).toEqual('test');
|
||||
expect(eventBus.restart).toHaveBeenCalled();
|
||||
jest.spyOn(eventBus, 'restart').mockRestore();
|
||||
expect(license.reload).toHaveBeenCalled();
|
||||
jest.spyOn(license, 'reload').mockRestore();
|
||||
});
|
||||
|
||||
test('should reject command messages from iteslf', async () => {
|
||||
jest.spyOn(eventBus, 'restart');
|
||||
const response = await os.handleCommandMessage(
|
||||
const response = await handleCommandMessage(
|
||||
JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }),
|
||||
os.uniqueInstanceId,
|
||||
);
|
||||
expect(response).toBeDefined();
|
||||
expect(response!.command).toEqual('restartEventBus');
|
||||
|
@ -133,8 +133,4 @@ describe('Orchestration Service', () => {
|
|||
expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled();
|
||||
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await os.shutdown();
|
||||
});
|
||||
});
|
||||
|
|
|
@ -195,8 +195,8 @@ importers:
|
|||
specifier: workspace:*
|
||||
version: link:../@n8n/client-oauth2
|
||||
'@n8n_io/license-sdk':
|
||||
specifier: ~2.5.1
|
||||
version: 2.5.1
|
||||
specifier: ~2.6.0
|
||||
version: 2.6.0
|
||||
'@oclif/command':
|
||||
specifier: ^1.8.16
|
||||
version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1)
|
||||
|
@ -4656,8 +4656,8 @@ packages:
|
|||
acorn-walk: 8.2.0
|
||||
dev: false
|
||||
|
||||
/@n8n_io/license-sdk@2.5.1:
|
||||
resolution: {integrity: sha512-CL4JVJS8nvI8qPFQ1jSG7CiPnNkeKJSgbDxWOLVX4MRjTKrwL8Cpd1LeYMx5g5StmHzkoxz2TDqL8WT6qyMlrQ==}
|
||||
/@n8n_io/license-sdk@2.6.0:
|
||||
resolution: {integrity: sha512-jPUn8xKAZMWgFw8w6BwqbdlZ1Et4tZcPUdOfEzxpWxEmgtCEAdbl3V0ygP3pTXyWY0hblvv8QzbHOUrK25hQSA==}
|
||||
engines: {node: '>=14.0.0', npm: '>=7.10.0'}
|
||||
dependencies:
|
||||
crypto-js: 4.1.1
|
||||
|
|
Loading…
Reference in a new issue