chore(core): Bring multi-main setup in line with scaling services (#11289)

This commit is contained in:
Iván Ovejero 2024-10-17 10:47:17 +02:00 committed by GitHub
parent fbae17d8fb
commit be50a9ac44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 76 additions and 52 deletions

View file

@ -0,0 +1,16 @@
import { Config, Env } from '../decorators';
@Config
export class MultiMainSetupConfig {
/** Whether to enable multi-main setup (if licensed) for scaling mode. */
@Env('N8N_MULTI_MAIN_SETUP_ENABLED')
enabled: boolean = false;
/** Time to live (in seconds) for leader key in multi-main setup. */
@Env('N8N_MULTI_MAIN_SETUP_KEY_TTL')
ttl: number = 10;
/** Interval (in seconds) for leader check in multi-main setup. */
@Env('N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL')
interval: number = 3;
}

View file

@ -6,6 +6,7 @@ import { EventBusConfig } from './configs/event-bus.config';
import { ExternalSecretsConfig } from './configs/external-secrets.config'; import { ExternalSecretsConfig } from './configs/external-secrets.config';
import { ExternalStorageConfig } from './configs/external-storage.config'; import { ExternalStorageConfig } from './configs/external-storage.config';
import { LoggingConfig } from './configs/logging.config'; import { LoggingConfig } from './configs/logging.config';
import { MultiMainSetupConfig } from './configs/multi-main-setup.config';
import { NodesConfig } from './configs/nodes.config'; import { NodesConfig } from './configs/nodes.config';
import { PublicApiConfig } from './configs/public-api.config'; import { PublicApiConfig } from './configs/public-api.config';
import { TaskRunnersConfig } from './configs/runners.config'; import { TaskRunnersConfig } from './configs/runners.config';
@ -93,4 +94,7 @@ export class GlobalConfig {
@Nested @Nested
taskRunners: TaskRunnersConfig; taskRunners: TaskRunnersConfig;
@Nested
multiMainSetup: MultiMainSetupConfig;
} }

View file

@ -246,6 +246,11 @@ describe('GlobalConfig', () => {
}, },
scopes: [], scopes: [],
}, },
multiMainSetup: {
enabled: false,
ttl: 10,
interval: 3,
},
}; };
it('should use all default values when no env variables are defined', () => { it('should use all default values when no env variables are defined', () => {

View file

@ -1,3 +1,4 @@
import type { GlobalConfig } from '@n8n/config';
import { LicenseManager } from '@n8n_io/license-sdk'; import { LicenseManager } from '@n8n_io/license-sdk';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core'; import type { InstanceSettings } from 'n8n-core';
@ -31,7 +32,8 @@ describe('License', () => {
}); });
beforeEach(async () => { beforeEach(async () => {
license = new License(mockLogger(), instanceSettings, mock(), mock(), mock()); const globalConfig = mock<GlobalConfig>({ multiMainSetup: { enabled: false } });
license = new License(mockLogger(), instanceSettings, mock(), mock(), mock(), globalConfig);
await license.init(); await license.init();
}); });
@ -64,6 +66,7 @@ describe('License', () => {
mock(), mock(),
mock(), mock(),
mock(), mock(),
mock(),
); );
await license.init(); await license.init();
expect(LicenseManager).toHaveBeenCalledWith( expect(LicenseManager).toHaveBeenCalledWith(
@ -197,9 +200,9 @@ describe('License', () => {
describe('in single-main setup', () => { describe('in single-main setup', () => {
describe('with `license.autoRenewEnabled` enabled', () => { describe('with `license.autoRenewEnabled` enabled', () => {
it('should enable renewal', async () => { it('should enable renewal', async () => {
config.set('multiMainSetup.enabled', false); const globalConfig = mock<GlobalConfig>({ multiMainSetup: { enabled: false } });
await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
expect(LicenseManager).toHaveBeenCalledWith( expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }), expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@ -211,7 +214,7 @@ describe('License', () => {
it('should disable renewal', async () => { it('should disable renewal', async () => {
config.set('license.autoRenewEnabled', false); config.set('license.autoRenewEnabled', false);
await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); await new License(mockLogger(), mock(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith( expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }), expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -225,11 +228,11 @@ describe('License', () => {
test.each(['unset', 'leader', 'follower'])( test.each(['unset', 'leader', 'follower'])(
'if %s status, should disable removal', 'if %s status, should disable removal',
async (status) => { async (status) => {
config.set('multiMainSetup.enabled', true); const globalConfig = mock<GlobalConfig>({ multiMainSetup: { enabled: true } });
config.set('multiMainSetup.instanceType', status); config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false); config.set('license.autoRenewEnabled', false);
await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
expect(LicenseManager).toHaveBeenCalledWith( expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }), expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -240,11 +243,11 @@ describe('License', () => {
describe('with `license.autoRenewEnabled` enabled', () => { describe('with `license.autoRenewEnabled` enabled', () => {
test.each(['unset', 'follower'])('if %s status, should disable removal', async (status) => { test.each(['unset', 'follower'])('if %s status, should disable removal', async (status) => {
config.set('multiMainSetup.enabled', true); const globalConfig = mock<GlobalConfig>({ multiMainSetup: { enabled: true } });
config.set('multiMainSetup.instanceType', status); config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false); config.set('license.autoRenewEnabled', false);
await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
expect(LicenseManager).toHaveBeenCalledWith( expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }), expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -252,10 +255,10 @@ describe('License', () => {
}); });
it('if leader status, should enable renewal', async () => { it('if leader status, should enable renewal', async () => {
config.set('multiMainSetup.enabled', true); const globalConfig = mock<GlobalConfig>({ multiMainSetup: { enabled: true } });
config.set('multiMainSetup.instanceType', 'leader'); config.set('multiMainSetup.instanceType', 'leader');
await new License(mockLogger(), mock(), mock(), mock(), mock()).init(); await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
expect(LicenseManager).toHaveBeenCalledWith( expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }), expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@ -267,7 +270,7 @@ describe('License', () => {
describe('reinit', () => { describe('reinit', () => {
it('should reinitialize license manager', async () => { it('should reinitialize license manager', async () => {
const license = new License(mockLogger(), mock(), mock(), mock(), mock()); const license = new License(mockLogger(), mock(), mock(), mock(), mock(), mock());
await license.init(); await license.init();
const initSpy = jest.spyOn(license, 'init'); const initSpy = jest.spyOn(license, 'init');

View file

@ -3,7 +3,7 @@ import type { InstanceSettings } from 'n8n-core';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces'; import type { IExecutionResponse } from '@/interfaces';
import type { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee'; import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { WaitTracker } from '@/wait-tracker'; import { WaitTracker } from '@/wait-tracker';
import { mockLogger } from '@test/mocking'; import { mockLogger } from '@test/mocking';
@ -13,7 +13,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => { describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>(); const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>(); const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), multiMainSetup); const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock());
const instanceSettings = mock<InstanceSettings>({ isLeader: true }); const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const execution = mock<IExecutionResponse>({ const execution = mock<IExecutionResponse>({

View file

@ -1,5 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { GlobalConfig } from '@n8n/config';
import { Flags } from '@oclif/core'; import { Flags } from '@oclif/core';
import glob from 'fast-glob'; import glob from 'fast-glob';
import { createReadStream, createWriteStream, existsSync } from 'fs'; import { createReadStream, createWriteStream, existsSync } from 'fs';
@ -240,7 +241,7 @@ export class Start extends BaseCommand {
} }
if ( if (
config.getEnv('multiMainSetup.enabled') && Container.get(GlobalConfig).multiMainSetup.enabled &&
!Container.get(License).isMultipleMainInstancesLicensed() !Container.get(License).isMultipleMainInstancesLicensed()
) { ) {
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);

View file

@ -83,7 +83,7 @@ export class Webhook extends BaseCommand {
} }
async run() { async run() {
if (config.getEnv('multiMainSetup.enabled')) { if (this.globalConfig.multiMainSetup.enabled) {
throw new ApplicationError( throw new ApplicationError(
'Webhook process cannot be started when multi-main setup is enabled.', 'Webhook process cannot be started when multi-main setup is enabled.',
); );

View file

@ -564,27 +564,6 @@ export const schema = {
}, },
}, },
multiMainSetup: {
enabled: {
doc: 'Whether to enable multi-main setup for queue mode (license required)',
format: Boolean,
default: false,
env: 'N8N_MULTI_MAIN_SETUP_ENABLED',
},
ttl: {
doc: 'Time to live (in seconds) for leader key in multi-main setup',
format: Number,
default: 10,
env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL',
},
interval: {
doc: 'Interval (in seconds) for leader check in multi-main setup',
format: Number,
default: 3,
env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL',
},
},
proxy_hops: { proxy_hops: {
format: Number, format: Number,
default: 0, default: 0,

View file

@ -780,7 +780,7 @@ export class TelemetryEventRelay extends EventRelay {
license_plan_name: this.license.getPlanName(), license_plan_name: this.license.getPlanName(),
license_tenant_id: config.getEnv('license.tenantId'), license_tenant_id: config.getEnv('license.tenantId'),
binary_data_s3: isS3Available && isS3Selected && isS3Licensed, binary_data_s3: isS3Available && isS3Selected && isS3Licensed,
multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'), multi_main_setup_enabled: this.globalConfig.multiMainSetup.enabled,
metrics: { metrics: {
metrics_enabled: this.globalConfig.endpoints.metrics.enable, metrics_enabled: this.globalConfig.endpoints.metrics.enable,
metrics_category_default: this.globalConfig.endpoints.metrics.includeDefaultMetrics, metrics_category_default: this.globalConfig.endpoints.metrics.includeDefaultMetrics,

View file

@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import type { TEntitlement, TFeatures, TLicenseBlock } from '@n8n_io/license-sdk'; import type { TEntitlement, TFeatures, TLicenseBlock } from '@n8n_io/license-sdk';
import { LicenseManager } from '@n8n_io/license-sdk'; import { LicenseManager } from '@n8n_io/license-sdk';
import { InstanceSettings, ObjectStoreService } from 'n8n-core'; import { InstanceSettings, ObjectStoreService } from 'n8n-core';
@ -37,6 +38,7 @@ export class License {
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
private readonly settingsRepository: SettingsRepository, private readonly settingsRepository: SettingsRepository,
private readonly licenseMetricsService: LicenseMetricsService, private readonly licenseMetricsService: LicenseMetricsService,
private readonly globalConfig: GlobalConfig,
) { ) {
this.logger = this.logger.withScope('license'); this.logger = this.logger.withScope('license');
} }
@ -54,7 +56,7 @@ export class License {
* On becoming leader or follower, each will enable or disable renewal, respectively. * On becoming leader or follower, each will enable or disable renewal, respectively.
* This ensures the mains do not cause a 429 (too many requests) on license init. * This ensures the mains do not cause a 429 (too many requests) on license init.
*/ */
if (config.getEnv('multiMainSetup.enabled')) { if (this.globalConfig.multiMainSetup.enabled) {
return autoRenewEnabled && this.instanceSettings.isLeader; return autoRenewEnabled && this.instanceSettings.isLeader;
} }
@ -136,7 +138,7 @@ export class License {
async onFeatureChange(_features: TFeatures): Promise<void> { async onFeatureChange(_features: TFeatures): Promise<void> {
this.logger.debug('License feature change detected', _features); this.logger.debug('License feature change detected', _features);
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { if (config.getEnv('executions.mode') === 'queue' && this.globalConfig.multiMainSetup.enabled) {
const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as
| boolean | boolean
| undefined; | undefined;

View file

@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import { InstanceSettings } from 'n8n-core'; import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi'; import { Service } from 'typedi';
@ -9,10 +10,22 @@ import { RedisClientService } from '@/services/redis-client.service';
import { TypedEmitter } from '@/typed-emitter'; import { TypedEmitter } from '@/typed-emitter';
type MultiMainEvents = { type MultiMainEvents = {
/**
* Emitted when this instance loses leadership. In response, its various
* services will stop triggers, pollers, pruning, wait-tracking, license
* renewal, queue recovery, etc.
*/
'leader-stepdown': never; 'leader-stepdown': never;
/**
* Emitted when this instance gains leadership. In response, its various
* services will start triggers, pollers, pruning, wait-tracking, license
* renewal, queue recovery, etc.
*/
'leader-takeover': never; 'leader-takeover': never;
}; };
/** Designates leader and followers when running multiple main processes. */
@Service() @Service()
export class MultiMainSetup extends TypedEmitter<MultiMainEvents> { export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
constructor( constructor(
@ -20,13 +33,15 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
private readonly publisher: Publisher, private readonly publisher: Publisher,
private readonly redisClientService: RedisClientService, private readonly redisClientService: RedisClientService,
private readonly globalConfig: GlobalConfig,
) { ) {
super(); super();
this.logger = this.logger.withScope('scaling');
} }
private leaderKey: string; private leaderKey: string;
private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl'); private readonly leaderKeyTtl = this.globalConfig.multiMainSetup.ttl;
private leaderCheckInterval: NodeJS.Timer | undefined; private leaderCheckInterval: NodeJS.Timer | undefined;
@ -39,7 +54,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
this.leaderCheckInterval = setInterval(async () => { this.leaderCheckInterval = setInterval(async () => {
await this.checkLeader(); await this.checkLeader();
}, config.getEnv('multiMainSetup.interval') * TIME.SECOND); }, this.globalConfig.multiMainSetup.interval * TIME.SECOND);
} }
async shutdown() { async shutdown() {
@ -69,7 +84,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
if (this.instanceSettings.isLeader) { if (this.instanceSettings.isLeader) {
this.instanceSettings.markAsFollower(); this.instanceSettings.markAsFollower();
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery this.emit('leader-stepdown');
this.logger.warn('[Multi-main setup] Leader failed to renew leader key'); this.logger.warn('[Multi-main setup] Leader failed to renew leader key');
} }
@ -84,9 +99,6 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
this.instanceSettings.markAsFollower(); this.instanceSettings.markAsFollower();
/**
* Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery
*/
this.emit('leader-stepdown'); this.emit('leader-stepdown');
await this.tryBecomeLeader(); await this.tryBecomeLeader();
@ -106,9 +118,6 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
/**
* Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal, queue recovery
*/
this.emit('leader-takeover'); this.emit('leader-takeover');
} else { } else {
this.instanceSettings.markAsFollower(); this.instanceSettings.markAsFollower();

View file

@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import { InstanceSettings } from 'n8n-core'; import { InstanceSettings } from 'n8n-core';
import Container, { Service } from 'typedi'; import Container, { Service } from 'typedi';
@ -5,13 +6,14 @@ import config from '@/config';
import type { Publisher } from '@/scaling/pubsub/publisher.service'; import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; import { MultiMainSetup } from '../scaling/multi-main-setup.ee';
@Service() @Service()
export class OrchestrationService { export class OrchestrationService {
constructor( constructor(
readonly instanceSettings: InstanceSettings, readonly instanceSettings: InstanceSettings,
readonly multiMainSetup: MultiMainSetup, readonly multiMainSetup: MultiMainSetup,
readonly globalConfig: GlobalConfig,
) {} ) {}
private publisher: Publisher; private publisher: Publisher;
@ -29,7 +31,7 @@ export class OrchestrationService {
get isMultiMainSetupEnabled() { get isMultiMainSetupEnabled() {
return ( return (
config.getEnv('executions.mode') === 'queue' && config.getEnv('executions.mode') === 'queue' &&
config.getEnv('multiMainSetup.enabled') && this.globalConfig.multiMainSetup.enabled &&
this.instanceSettings.instanceType === 'main' && this.instanceSettings.instanceType === 'main' &&
this.isMultiMainSetupLicensed this.isMultiMainSetupLicensed
); );

View file

@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import { BinaryDataService, InstanceSettings } from 'n8n-core'; import { BinaryDataService, InstanceSettings } from 'n8n-core';
import { jsonStringify } from 'n8n-workflow'; import { jsonStringify } from 'n8n-workflow';
import { Service } from 'typedi'; import { Service } from 'typedi';
@ -31,6 +32,7 @@ export class PruningService {
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService, private readonly binaryDataService: BinaryDataService,
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
private readonly globalConfig: GlobalConfig,
) {} ) {}
/** /**
@ -54,7 +56,7 @@ export class PruningService {
return false; return false;
} }
if (config.getEnv('multiMainSetup.enabled') && instanceType === 'main' && isFollower) { if (this.globalConfig.multiMainSetup.enabled && instanceType === 'main' && isFollower) {
return false; return false;
} }

View file

@ -5,7 +5,7 @@ import { ActiveWorkflowManager } from '@/active-workflow-manager';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { generateNanoId } from '@/databases/utils/generators'; import { generateNanoId } from '@/databases/utils/generators';
import { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee'; import { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { createOwner } from './shared/db/users'; import { createOwner } from './shared/db/users';
import { randomName } from './shared/random'; import { randomName } from './shared/random';

View file

@ -38,6 +38,7 @@ describe('softDeleteOnPruningCycle()', () => {
Container.get(ExecutionRepository), Container.get(ExecutionRepository),
mockInstance(BinaryDataService), mockInstance(BinaryDataService),
mock(), mock(),
mock(),
); );
workflow = await createWorkflow(); workflow = await createWorkflow();