From be50a9ac44cfe674c31f86041bc3e25780286a87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 17 Oct 2024 10:47:17 +0200 Subject: [PATCH] chore(core): Bring multi-main setup in line with scaling services (#11289) --- .../src/configs/multi-main-setup.config.ts | 16 +++++++++++ packages/@n8n/config/src/index.ts | 4 +++ packages/@n8n/config/test/config.test.ts | 5 ++++ packages/cli/src/__tests__/license.test.ts | 25 +++++++++-------- .../cli/src/__tests__/wait-tracker.test.ts | 4 +-- packages/cli/src/commands/start.ts | 3 ++- packages/cli/src/commands/webhook.ts | 2 +- packages/cli/src/config/schema.ts | 21 --------------- .../events/relays/telemetry.event-relay.ts | 2 +- packages/cli/src/license.ts | 6 +++-- .../main => scaling}/multi-main-setup.ee.ts | 27 ++++++++++++------- .../cli/src/services/orchestration.service.ts | 6 +++-- packages/cli/src/services/pruning.service.ts | 4 ++- .../test/integration/debug.controller.test.ts | 2 +- .../test/integration/pruning.service.test.ts | 1 + 15 files changed, 76 insertions(+), 52 deletions(-) create mode 100644 packages/@n8n/config/src/configs/multi-main-setup.config.ts rename packages/cli/src/{services/orchestration/main => scaling}/multi-main-setup.ee.ts (78%) diff --git a/packages/@n8n/config/src/configs/multi-main-setup.config.ts b/packages/@n8n/config/src/configs/multi-main-setup.config.ts new file mode 100644 index 0000000000..e3599c1d55 --- /dev/null +++ b/packages/@n8n/config/src/configs/multi-main-setup.config.ts @@ -0,0 +1,16 @@ +import { Config, Env } from '../decorators'; + +@Config +export class MultiMainSetupConfig { + /** Whether to enable multi-main setup (if licensed) for scaling mode. */ + @Env('N8N_MULTI_MAIN_SETUP_ENABLED') + enabled: boolean = false; + + /** Time to live (in seconds) for leader key in multi-main setup. */ + @Env('N8N_MULTI_MAIN_SETUP_KEY_TTL') + ttl: number = 10; + + /** Interval (in seconds) for leader check in multi-main setup. */ + @Env('N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL') + interval: number = 3; +} diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index 9044ffa0fa..9682160f3c 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -6,6 +6,7 @@ import { EventBusConfig } from './configs/event-bus.config'; import { ExternalSecretsConfig } from './configs/external-secrets.config'; import { ExternalStorageConfig } from './configs/external-storage.config'; import { LoggingConfig } from './configs/logging.config'; +import { MultiMainSetupConfig } from './configs/multi-main-setup.config'; import { NodesConfig } from './configs/nodes.config'; import { PublicApiConfig } from './configs/public-api.config'; import { TaskRunnersConfig } from './configs/runners.config'; @@ -93,4 +94,7 @@ export class GlobalConfig { @Nested taskRunners: TaskRunnersConfig; + + @Nested + multiMainSetup: MultiMainSetupConfig; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 56f3bc6de7..04021a45aa 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -246,6 +246,11 @@ describe('GlobalConfig', () => { }, scopes: [], }, + multiMainSetup: { + enabled: false, + ttl: 10, + interval: 3, + }, }; it('should use all default values when no env variables are defined', () => { diff --git a/packages/cli/src/__tests__/license.test.ts b/packages/cli/src/__tests__/license.test.ts index 67a92b95cd..70aa80347a 100644 --- a/packages/cli/src/__tests__/license.test.ts +++ b/packages/cli/src/__tests__/license.test.ts @@ -1,3 +1,4 @@ +import type { GlobalConfig } from '@n8n/config'; import { LicenseManager } from '@n8n_io/license-sdk'; import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; @@ -31,7 +32,8 @@ describe('License', () => { }); beforeEach(async () => { - license = new License(mockLogger(), instanceSettings, mock(), mock(), mock()); + const globalConfig = mock({ multiMainSetup: { enabled: false } }); + license = new License(mockLogger(), instanceSettings, mock(), mock(), mock(), globalConfig); await license.init(); }); @@ -64,6 +66,7 @@ describe('License', () => { mock(), mock(), mock(), + mock(), ); await license.init(); expect(LicenseManager).toHaveBeenCalledWith( @@ -197,9 +200,9 @@ describe('License', () => { describe('in single-main setup', () => { describe('with `license.autoRenewEnabled` enabled', () => { it('should enable renewal', async () => { - config.set('multiMainSetup.enabled', false); + const globalConfig = mock({ multiMainSetup: { enabled: false } }); - await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); + await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init(); expect(LicenseManager).toHaveBeenCalledWith( expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }), @@ -211,7 +214,7 @@ describe('License', () => { it('should disable renewal', async () => { config.set('license.autoRenewEnabled', false); - await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); + await new License(mockLogger(), mock(), mock(), mock(), mock(), mock()).init(); expect(LicenseManager).toHaveBeenCalledWith( expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }), @@ -225,11 +228,11 @@ describe('License', () => { test.each(['unset', 'leader', 'follower'])( 'if %s status, should disable removal', async (status) => { - config.set('multiMainSetup.enabled', true); + const globalConfig = mock({ multiMainSetup: { enabled: true } }); config.set('multiMainSetup.instanceType', status); config.set('license.autoRenewEnabled', false); - await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); + await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init(); expect(LicenseManager).toHaveBeenCalledWith( expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }), @@ -240,11 +243,11 @@ describe('License', () => { describe('with `license.autoRenewEnabled` enabled', () => { test.each(['unset', 'follower'])('if %s status, should disable removal', async (status) => { - config.set('multiMainSetup.enabled', true); + const globalConfig = mock({ multiMainSetup: { enabled: true } }); config.set('multiMainSetup.instanceType', status); config.set('license.autoRenewEnabled', false); - await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); + await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init(); expect(LicenseManager).toHaveBeenCalledWith( expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }), @@ -252,10 +255,10 @@ describe('License', () => { }); it('if leader status, should enable renewal', async () => { - config.set('multiMainSetup.enabled', true); + const globalConfig = mock({ multiMainSetup: { enabled: true } }); config.set('multiMainSetup.instanceType', 'leader'); - await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); + await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init(); expect(LicenseManager).toHaveBeenCalledWith( expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }), @@ -267,7 +270,7 @@ describe('License', () => { describe('reinit', () => { it('should reinitialize license manager', async () => { - const license = new License(mockLogger(), mock(), mock(), mock(), mock()); + const license = new License(mockLogger(), mock(), mock(), mock(), mock(), mock()); await license.init(); const initSpy = jest.spyOn(license, 'init'); diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index 2473713891..66c26f00c6 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -3,7 +3,7 @@ import type { InstanceSettings } from 'n8n-core'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutionResponse } from '@/interfaces'; -import type { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee'; +import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import { OrchestrationService } from '@/services/orchestration.service'; import { WaitTracker } from '@/wait-tracker'; import { mockLogger } from '@test/mocking'; @@ -13,7 +13,7 @@ jest.useFakeTimers(); describe('WaitTracker', () => { const executionRepository = mock(); const multiMainSetup = mock(); - const orchestrationService = new OrchestrationService(mock(), multiMainSetup); + const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock()); const instanceSettings = mock({ isLeader: true }); const execution = mock({ diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index b46ef52ea4..7865739eec 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -1,5 +1,6 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ +import { GlobalConfig } from '@n8n/config'; import { Flags } from '@oclif/core'; import glob from 'fast-glob'; import { createReadStream, createWriteStream, existsSync } from 'fs'; @@ -240,7 +241,7 @@ export class Start extends BaseCommand { } if ( - config.getEnv('multiMainSetup.enabled') && + Container.get(GlobalConfig).multiMainSetup.enabled && !Container.get(License).isMultipleMainInstancesLicensed() ) { throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index d9d2f011fb..77ec770aa0 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -83,7 +83,7 @@ export class Webhook extends BaseCommand { } async run() { - if (config.getEnv('multiMainSetup.enabled')) { + if (this.globalConfig.multiMainSetup.enabled) { throw new ApplicationError( 'Webhook process cannot be started when multi-main setup is enabled.', ); diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index d2bb5297d4..31b11658e2 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -564,27 +564,6 @@ export const schema = { }, }, - multiMainSetup: { - enabled: { - doc: 'Whether to enable multi-main setup for queue mode (license required)', - format: Boolean, - default: false, - env: 'N8N_MULTI_MAIN_SETUP_ENABLED', - }, - ttl: { - doc: 'Time to live (in seconds) for leader key in multi-main setup', - format: Number, - default: 10, - env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL', - }, - interval: { - doc: 'Interval (in seconds) for leader check in multi-main setup', - format: Number, - default: 3, - env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL', - }, - }, - proxy_hops: { format: Number, default: 0, diff --git a/packages/cli/src/events/relays/telemetry.event-relay.ts b/packages/cli/src/events/relays/telemetry.event-relay.ts index 4cf0690eec..9e00e2e055 100644 --- a/packages/cli/src/events/relays/telemetry.event-relay.ts +++ b/packages/cli/src/events/relays/telemetry.event-relay.ts @@ -780,7 +780,7 @@ export class TelemetryEventRelay extends EventRelay { license_plan_name: this.license.getPlanName(), license_tenant_id: config.getEnv('license.tenantId'), binary_data_s3: isS3Available && isS3Selected && isS3Licensed, - multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'), + multi_main_setup_enabled: this.globalConfig.multiMainSetup.enabled, metrics: { metrics_enabled: this.globalConfig.endpoints.metrics.enable, metrics_category_default: this.globalConfig.endpoints.metrics.includeDefaultMetrics, diff --git a/packages/cli/src/license.ts b/packages/cli/src/license.ts index da7ab80313..36fb520e23 100644 --- a/packages/cli/src/license.ts +++ b/packages/cli/src/license.ts @@ -1,3 +1,4 @@ +import { GlobalConfig } from '@n8n/config'; import type { TEntitlement, TFeatures, TLicenseBlock } from '@n8n_io/license-sdk'; import { LicenseManager } from '@n8n_io/license-sdk'; import { InstanceSettings, ObjectStoreService } from 'n8n-core'; @@ -37,6 +38,7 @@ export class License { private readonly orchestrationService: OrchestrationService, private readonly settingsRepository: SettingsRepository, private readonly licenseMetricsService: LicenseMetricsService, + private readonly globalConfig: GlobalConfig, ) { this.logger = this.logger.withScope('license'); } @@ -54,7 +56,7 @@ export class License { * On becoming leader or follower, each will enable or disable renewal, respectively. * This ensures the mains do not cause a 429 (too many requests) on license init. */ - if (config.getEnv('multiMainSetup.enabled')) { + if (this.globalConfig.multiMainSetup.enabled) { return autoRenewEnabled && this.instanceSettings.isLeader; } @@ -136,7 +138,7 @@ export class License { async onFeatureChange(_features: TFeatures): Promise { this.logger.debug('License feature change detected', _features); - if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { + if (config.getEnv('executions.mode') === 'queue' && this.globalConfig.multiMainSetup.enabled) { const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as | boolean | undefined; diff --git a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts b/packages/cli/src/scaling/multi-main-setup.ee.ts similarity index 78% rename from packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts rename to packages/cli/src/scaling/multi-main-setup.ee.ts index 034a214765..76c964fc4f 100644 --- a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts +++ b/packages/cli/src/scaling/multi-main-setup.ee.ts @@ -1,3 +1,4 @@ +import { GlobalConfig } from '@n8n/config'; import { InstanceSettings } from 'n8n-core'; import { Service } from 'typedi'; @@ -9,10 +10,22 @@ import { RedisClientService } from '@/services/redis-client.service'; import { TypedEmitter } from '@/typed-emitter'; type MultiMainEvents = { + /** + * Emitted when this instance loses leadership. In response, its various + * services will stop triggers, pollers, pruning, wait-tracking, license + * renewal, queue recovery, etc. + */ 'leader-stepdown': never; + + /** + * Emitted when this instance gains leadership. In response, its various + * services will start triggers, pollers, pruning, wait-tracking, license + * renewal, queue recovery, etc. + */ 'leader-takeover': never; }; +/** Designates leader and followers when running multiple main processes. */ @Service() export class MultiMainSetup extends TypedEmitter { constructor( @@ -20,13 +33,15 @@ export class MultiMainSetup extends TypedEmitter { private readonly instanceSettings: InstanceSettings, private readonly publisher: Publisher, private readonly redisClientService: RedisClientService, + private readonly globalConfig: GlobalConfig, ) { super(); + this.logger = this.logger.withScope('scaling'); } private leaderKey: string; - private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl'); + private readonly leaderKeyTtl = this.globalConfig.multiMainSetup.ttl; private leaderCheckInterval: NodeJS.Timer | undefined; @@ -39,7 +54,7 @@ export class MultiMainSetup extends TypedEmitter { this.leaderCheckInterval = setInterval(async () => { await this.checkLeader(); - }, config.getEnv('multiMainSetup.interval') * TIME.SECOND); + }, this.globalConfig.multiMainSetup.interval * TIME.SECOND); } async shutdown() { @@ -69,7 +84,7 @@ export class MultiMainSetup extends TypedEmitter { if (this.instanceSettings.isLeader) { this.instanceSettings.markAsFollower(); - this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery + this.emit('leader-stepdown'); this.logger.warn('[Multi-main setup] Leader failed to renew leader key'); } @@ -84,9 +99,6 @@ export class MultiMainSetup extends TypedEmitter { this.instanceSettings.markAsFollower(); - /** - * Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery - */ this.emit('leader-stepdown'); await this.tryBecomeLeader(); @@ -106,9 +118,6 @@ export class MultiMainSetup extends TypedEmitter { await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); - /** - * Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal, queue recovery - */ this.emit('leader-takeover'); } else { this.instanceSettings.markAsFollower(); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 61a2aff540..225badbf18 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -1,3 +1,4 @@ +import { GlobalConfig } from '@n8n/config'; import { InstanceSettings } from 'n8n-core'; import Container, { Service } from 'typedi'; @@ -5,13 +6,14 @@ import config from '@/config'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; -import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; +import { MultiMainSetup } from '../scaling/multi-main-setup.ee'; @Service() export class OrchestrationService { constructor( readonly instanceSettings: InstanceSettings, readonly multiMainSetup: MultiMainSetup, + readonly globalConfig: GlobalConfig, ) {} private publisher: Publisher; @@ -29,7 +31,7 @@ export class OrchestrationService { get isMultiMainSetupEnabled() { return ( config.getEnv('executions.mode') === 'queue' && - config.getEnv('multiMainSetup.enabled') && + this.globalConfig.multiMainSetup.enabled && this.instanceSettings.instanceType === 'main' && this.isMultiMainSetupLicensed ); diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index e9ceab5434..0859dddd39 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -1,3 +1,4 @@ +import { GlobalConfig } from '@n8n/config'; import { BinaryDataService, InstanceSettings } from 'n8n-core'; import { jsonStringify } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -31,6 +32,7 @@ export class PruningService { private readonly executionRepository: ExecutionRepository, private readonly binaryDataService: BinaryDataService, private readonly orchestrationService: OrchestrationService, + private readonly globalConfig: GlobalConfig, ) {} /** @@ -54,7 +56,7 @@ export class PruningService { return false; } - if (config.getEnv('multiMainSetup.enabled') && instanceType === 'main' && isFollower) { + if (this.globalConfig.multiMainSetup.enabled && instanceType === 'main' && isFollower) { return false; } diff --git a/packages/cli/test/integration/debug.controller.test.ts b/packages/cli/test/integration/debug.controller.test.ts index 47695e59aa..8ab58bd1a0 100644 --- a/packages/cli/test/integration/debug.controller.test.ts +++ b/packages/cli/test/integration/debug.controller.test.ts @@ -5,7 +5,7 @@ import { ActiveWorkflowManager } from '@/active-workflow-manager'; import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { generateNanoId } from '@/databases/utils/generators'; -import { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee'; +import { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import { createOwner } from './shared/db/users'; import { randomName } from './shared/random'; diff --git a/packages/cli/test/integration/pruning.service.test.ts b/packages/cli/test/integration/pruning.service.test.ts index c4d1957de0..af79640746 100644 --- a/packages/cli/test/integration/pruning.service.test.ts +++ b/packages/cli/test/integration/pruning.service.test.ts @@ -38,6 +38,7 @@ describe('softDeleteOnPruningCycle()', () => { Container.get(ExecutionRepository), mockInstance(BinaryDataService), mock(), + mock(), ); workflow = await createWorkflow();