diff --git a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts index 16b58feb0f..0d505a921d 100644 --- a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts +++ b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts @@ -19,7 +19,7 @@ import { import { License } from '@/License'; import { InternalHooks } from '@/InternalHooks'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; -import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; +import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; @Service() export class ExternalSecretsManager { @@ -82,7 +82,7 @@ export class ExternalSecretsManager { } async broadcastReloadExternalSecretsProviders() { - await Container.get(OrchestrationMainService).broadcastReloadExternalSecretsProviders(); + await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders(); } private decryptSecretsSettings(value: string): ExternalSecretsSettings { diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index 89f955502d..81fceac16a 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -23,6 +23,14 @@ type FeatureReturnType = Partial< } & { [K in NumericLicenseFeature]: number } & { [K in BooleanLicenseFeature]: boolean } >; +export class FeatureNotLicensedError extends Error { + constructor(feature: (typeof LICENSE_FEATURES)[keyof typeof LICENSE_FEATURES]) { + super( + `Your license does not allow for ${feature}. To enable ${feature}, please upgrade to a license that supports this feature.`, + ); + } +} + @Service() export class License { private manager: LicenseManager | undefined; @@ -204,6 +212,10 @@ export class License { return this.isFeatureEnabled(LICENSE_FEATURES.BINARY_DATA_S3); } + isMultipleMainInstancesLicensed() { + return this.isFeatureEnabled(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); + } + isVariablesEnabled() { return this.isFeatureEnabled(LICENSE_FEATURES.VARIABLES); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index bc0c813306..7175d4aa58 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -21,14 +21,14 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import * as Db from '@/Db'; import * as GenericHelpers from '@/GenericHelpers'; import { Server } from '@/Server'; -import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants'; +import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR, LICENSE_FEATURES } from '@/constants'; import { eventBus } from '@/eventbus'; import { BaseCommand } from './BaseCommand'; import { InternalHooks } from '@/InternalHooks'; -import { License } from '@/License'; +import { License, FeatureNotLicensedError } from '@/License'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { IConfig } from '@oclif/config'; -import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; +import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires @@ -112,6 +112,14 @@ export class Start extends BaseCommand { Container.get(ExecutionRepository).clearTimers(); + if (config.getEnv('leaderSelection.enabled')) { + const { MultiMainInstancePublisher } = await import( + '@/services/orchestration/main/MultiMainInstance.publisher.ee' + ); + + await Container.get(MultiMainInstancePublisher).destroy(); + } + await Container.get(InternalHooks).onN8nStop(); // Wait for active workflow executions to finish @@ -215,10 +223,24 @@ export class Start extends BaseCommand { } async initOrchestration() { - if (config.get('executions.mode') === 'queue') { - await Container.get(OrchestrationMainService).init(); + if (config.get('executions.mode') !== 'queue') return; + + if (!config.get('leaderSelection.enabled')) { + await Container.get(SingleMainInstancePublisher).init(); await Container.get(OrchestrationHandlerMainService).init(); + return; } + + if (!Container.get(License).isMultipleMainInstancesLicensed()) { + throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); + } + + const { MultiMainInstancePublisher } = await import( + '@/services/orchestration/main/MultiMainInstance.publisher.ee' + ); + + await Container.get(MultiMainInstancePublisher).init(); + await Container.get(OrchestrationHandlerMainService).init(); } async run() { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index ed87820b95..438a4daf0e 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -1323,4 +1323,25 @@ export const schema = { env: 'N8N_WORKFLOW_HISTORY_PRUNE_TIME', }, }, + + leaderSelection: { + enabled: { + doc: 'Whether to enable leader selection for multiple main instances (license required)', + format: Boolean, + default: false, + env: 'N8N_LEADER_SELECTION_ENABLED', + }, + ttl: { + doc: 'Time to live in Redis for leader selection key, in seconds', + format: Number, + default: 10, + env: 'N8N_LEADER_SELECTION_KEY_TTL', + }, + interval: { + doc: 'Interval in Redis for leader selection check, in seconds', + format: Number, + default: 3, + env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL', + }, + }, }; diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index c9209f4c84..d54c463646 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -85,6 +85,7 @@ export const LICENSE_FEATURES = { WORKFLOW_HISTORY: 'feat:workflowHistory', DEBUG_IN_EDITOR: 'feat:debugInEditor', BINARY_DATA_S3: 'feat:binaryDataS3', + MULTIPLE_MAIN_INSTANCES: 'feat:multipleMainInstances', } as const; export const LICENSE_QUOTAS = { diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index c81f59fc61..f4a0c71e26 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -67,6 +67,7 @@ export class E2EController { [LICENSE_FEATURES.WORKFLOW_HISTORY]: false, [LICENSE_FEATURES.DEBUG_IN_EDITOR]: false, [LICENSE_FEATURES.BINARY_DATA_S3]: false, + [LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES]: false, }; constructor( diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index 4d997107cc..b11957f682 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,13 +1,13 @@ import { Authorized, Get, RestController } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; import { Service } from 'typedi'; -import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; +import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; @Authorized(['global', 'owner']) @RestController('/orchestration') @Service() export class OrchestrationController { - constructor(private readonly orchestrationService: OrchestrationMainService) {} + constructor(private readonly orchestrationService: SingleMainInstancePublisher) {} /** * These endpoint currently do not return anything, they just trigger the messsage to diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 80ed8d3507..43cd1b5d4a 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -32,7 +32,7 @@ import { Container, Service } from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; +import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { Logger } from '@/Logger'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -207,7 +207,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()].startListening(); if (notifyWorkers) { await Container.get( - OrchestrationMainService, + SingleMainInstancePublisher, ).broadcastRestartEventbusAfterDestinationUpdate(); } return destination; @@ -235,7 +235,7 @@ export class MessageEventBus extends EventEmitter { } if (notifyWorkers) { await Container.get( - OrchestrationMainService, + SingleMainInstancePublisher, ).broadcastRestartEventbusAfterDestinationUpdate(); } return result; diff --git a/packages/cli/src/services/orchestration.base.service.ts b/packages/cli/src/services/orchestration.base.service.ts index 8440407283..d81e9ca05c 100644 --- a/packages/cli/src/services/orchestration.base.service.ts +++ b/packages/cli/src/services/orchestration.base.service.ts @@ -6,6 +6,8 @@ import config from '@/config'; export abstract class OrchestrationService { protected initialized = false; + protected queueModeId: string; + redisPublisher: RedisServicePubSubPublisher; readonly redisService: RedisService; @@ -28,6 +30,7 @@ export abstract class OrchestrationService { constructor() { this.redisService = Container.get(RedisService); + this.queueModeId = config.getEnv('redis.queueModeId'); } sanityCheck(): boolean { @@ -44,7 +47,7 @@ export abstract class OrchestrationService { this.initialized = false; } - private async initPublisher() { + protected async initPublisher() { this.redisPublisher = await this.redisService.getPubSubPublisher(); } } diff --git a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts new file mode 100644 index 0000000000..2c28fa61c8 --- /dev/null +++ b/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts @@ -0,0 +1,84 @@ +import config from '@/config'; +import { Service } from 'typedi'; +import { TIME } from '@/constants'; +import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; +import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; + +/** + * For use in main instance, in multiple main instances cluster. + */ +@Service() +export class MultiMainInstancePublisher extends SingleMainInstancePublisher { + private id = this.queueModeId; + + private leaderId: string | undefined; + + private get isLeader() { + return this.id === this.leaderId; + } + + private readonly leaderKey = getRedisPrefix() + ':main_instance_leader'; + + private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl'); + + private leaderCheckInterval: NodeJS.Timer | undefined; + + async init() { + await this.initPublisher(); + + this.initialized = true; + + await this.tryBecomeLeader(); + + this.leaderCheckInterval = setInterval( + async () => { + await this.checkLeader(); + }, + config.getEnv('leaderSelection.interval') * TIME.SECOND, + ); + } + + async destroy() { + clearInterval(this.leaderCheckInterval); + + if (this.isLeader) await this.redisPublisher.clear(this.leaderKey); + } + + private async checkLeader() { + if (!this.redisPublisher.redisClient) return; + + const leaderId = await this.redisPublisher.get(this.leaderKey); + + if (!leaderId) { + this.logger.debug('Leadership vacant, attempting to become leader...'); + await this.tryBecomeLeader(); + + return; + } + + if (this.isLeader) { + this.logger.debug(`Leader is this instance "${this.id}"`); + + await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); + } else { + this.logger.debug(`Leader is other instance "${leaderId}"`); + + this.leaderId = leaderId; + } + } + + private async tryBecomeLeader() { + if (this.isLeader || !this.redisPublisher.redisClient) return; + + // this can only succeed if leadership is currently vacant + const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id); + + if (keySetSuccessfully) { + this.logger.debug(`Leader is now this instance "${this.id}"`); + + this.leaderId = this.id; + + await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); + } + } +} diff --git a/packages/cli/src/services/orchestration/main/SingleMainInstance.publisher.ts b/packages/cli/src/services/orchestration/main/SingleMainInstance.publisher.ts new file mode 100644 index 0000000000..7773ffce47 --- /dev/null +++ b/packages/cli/src/services/orchestration/main/SingleMainInstance.publisher.ts @@ -0,0 +1,60 @@ +import { Logger } from '@/Logger'; +import { Service } from 'typedi'; +import { OrchestrationService } from '@/services/orchestration.base.service'; + +/** + * For use in main instance, in single main instance scenario. + */ +@Service() +export class SingleMainInstancePublisher extends OrchestrationService { + constructor(protected readonly logger: Logger) { + super(); + } + + sanityCheck() { + return this.initialized && this.isQueueMode && this.isMainInstance; + } + + async getWorkerStatus(id?: string) { + if (!this.sanityCheck()) return; + + const command = 'getStatus'; + + this.logger.debug(`Sending "${command}" to command channel`); + + await this.redisPublisher.publishToCommandChannel({ + command, + targets: id ? [id] : undefined, + }); + } + + async getWorkerIds() { + if (!this.sanityCheck()) return; + + const command = 'getId'; + + this.logger.debug(`Sending "${command}" to command channel`); + + await this.redisPublisher.publishToCommandChannel({ command }); + } + + async broadcastRestartEventbusAfterDestinationUpdate() { + if (!this.sanityCheck()) return; + + const command = 'restartEventBus'; + + this.logger.debug(`Sending "${command}" to command channel`); + + await this.redisPublisher.publishToCommandChannel({ command }); + } + + async broadcastReloadExternalSecretsProviders() { + if (!this.sanityCheck()) return; + + const command = 'reloadExternalSecretsProviders'; + + this.logger.debug(`Sending "${command}" to command channel`); + + await this.redisPublisher.publishToCommandChannel({ command }); + } +} diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 5fb3de35c8..77d3e6f8cd 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -34,7 +34,8 @@ export async function handleCommandMessageMain(messageString: string) { }; return message; } - if (isMainInstance) { + + if (isMainInstance && !config.getEnv('leaderSelection.enabled')) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently logger.error( 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', diff --git a/packages/cli/src/services/orchestration/main/orchestration.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.main.service.ts deleted file mode 100644 index c8105dbe17..0000000000 --- a/packages/cli/src/services/orchestration/main/orchestration.main.service.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { Service } from 'typedi'; -import { OrchestrationService } from '../../orchestration.base.service'; - -@Service() -export class OrchestrationMainService extends OrchestrationService { - sanityCheck(): boolean { - return this.initialized && this.isQueueMode && this.isMainInstance; - } - - async getWorkerStatus(id?: string) { - if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ - command: 'getStatus', - targets: id ? [id] : undefined, - }); - } - - async getWorkerIds() { - if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ - command: 'getId', - }); - } - - async broadcastRestartEventbusAfterDestinationUpdate() { - if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ - command: 'restartEventBus', - }); - } - - async broadcastReloadExternalSecretsProviders() { - if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ - command: 'reloadExternalSecretsProviders', - }); - } -} diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts index 3825cc2cc3..3e23208138 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -39,4 +39,30 @@ export class RedisServicePubSubPublisher extends RedisServiceBaseSender { async publishToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise { await this.publish(WORKER_RESPONSE_REDIS_CHANNEL, JSON.stringify(message)); } + + async setIfNotExists(key: string, value: string) { + if (!this.redisClient) await this.init(); + + const success = await this.redisClient?.setnx(key, value); + + return !!success; + } + + async setExpiration(key: string, ttl: number) { + if (!this.redisClient) await this.init(); + + await this.redisClient?.expire(key, ttl); + } + + async get(key: string) { + if (!this.redisClient) await this.init(); + + return this.redisClient?.get(key); + } + + async clear(key: string) { + if (!this.redisClient) await this.init(); + + await this.redisClient?.del(key); + } } diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 18b8e7ea0c..3381f9b409 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -1,6 +1,6 @@ import Container from 'typedi'; import config from '@/config'; -import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; +import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { eventBus } from '@/eventbus'; import { RedisService } from '@/services/redis.service'; @@ -12,7 +12,7 @@ import * as helpers from '@/services/orchestration/helpers'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { Logger } from '@/Logger'; -const os = Container.get(OrchestrationMainService); +const os = Container.get(SingleMainInstancePublisher); const handler = Container.get(OrchestrationHandlerMainService); let queueModeId: string;