diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index 9a2740ce7c..643da18fd2 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -55,7 +55,7 @@ export class License { * This ensures the mains do not cause a 429 (too many requests) on license init. */ if (config.getEnv('multiMainSetup.enabled')) { - return autoRenewEnabled && config.getEnv('instanceRole') === 'leader'; + return autoRenewEnabled && this.instanceSettings.isLeader; } return autoRenewEnabled; diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 4d7cd888b4..63349ab6d1 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -186,7 +186,7 @@ export class Start extends BaseCommand { await this.initOrchestration(); this.logger.debug('Orchestration init complete'); - if (!config.getEnv('license.autoRenewEnabled') && config.getEnv('instanceRole') === 'leader') { + if (!config.getEnv('license.autoRenewEnabled') && this.instanceSettings.isLeader) { this.logger.warn( 'Automatic license renewal is disabled. The license will not renew automatically, and access to licensed features may be lost!', ); @@ -210,7 +210,7 @@ export class Start extends BaseCommand { async initOrchestration() { if (config.getEnv('executions.mode') === 'regular') { - config.set('instanceRole', 'leader'); + this.instanceSettings.markAsLeader(); return; } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index dc63f53099..f99c14885d 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -759,12 +759,6 @@ export const schema = { }, }, - instanceRole: { - doc: 'Always `leader` in single-main setup. `leader` or `follower` in multi-main setup.', - format: ['unset', 'leader', 'follower'] as const, - default: 'unset', // only until Start.initOrchestration - }, - multiMainSetup: { enabled: { doc: 'Whether to enable multi-main setup for queue mode (license required)', diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index 7e33c6ac74..39a84d27a7 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -1,6 +1,7 @@ import Container from 'typedi'; import { stringify } from 'flatted'; import { randomInt } from 'n8n-workflow'; +import { InstanceSettings } from 'n8n-core'; import { mockInstance } from '@test/mocking'; import { createWorkflow } from '@test-integration/db/workflows'; @@ -21,38 +22,37 @@ import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNod import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants'; import { setupMessages } from './utils'; -import type { EventService } from '@/eventbus/event.service'; import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; -import type { Logger } from '@/Logger'; describe('ExecutionRecoveryService', () => { - let push: Push; + const push = mockInstance(Push); + mockInstance(InternalHooks); + const instanceSettings = new InstanceSettings(); + let executionRecoveryService: ExecutionRecoveryService; let orchestrationService: OrchestrationService; let executionRepository: ExecutionRepository; beforeAll(async () => { await testDb.init(); - push = mockInstance(Push); executionRepository = Container.get(ExecutionRepository); orchestrationService = Container.get(OrchestrationService); - mockInstance(InternalHooks); executionRecoveryService = new ExecutionRecoveryService( - mock(), + mock(), + instanceSettings, push, executionRepository, orchestrationService, - mock(), + mock(), ); }); beforeEach(() => { - config.set('instanceRole', 'leader'); + instanceSettings.markAsLeader(); }); afterEach(async () => { - config.load(config.default); jest.restoreAllMocks(); await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']); executionRecoveryService.shutdown(); @@ -69,7 +69,6 @@ describe('ExecutionRecoveryService', () => { * Arrange */ config.set('executions.mode', 'queue'); - jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true); const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); /** @@ -88,7 +87,7 @@ describe('ExecutionRecoveryService', () => { * Arrange */ config.set('executions.mode', 'queue'); - jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false); + instanceSettings.markAsFollower(); const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); /** @@ -130,7 +129,7 @@ describe('ExecutionRecoveryService', () => { /** * Arrange */ - config.set('instanceRole', 'follower'); + instanceSettings.markAsFollower(); // @ts-expect-error Private method const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); const messages = setupMessages('123', 'Some workflow'); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 615c65ea6e..152d4ad4f2 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -5,6 +5,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle import type { DateTime } from 'luxon'; import type { IRun, ITaskData } from 'n8n-workflow'; +import { InstanceSettings } from 'n8n-core'; import type { EventMessageTypes } from '../eventbus/EventMessageClasses'; import type { IExecutionResponse } from '@/Interfaces'; import { NodeCrashedError } from '@/errors/node-crashed.error'; @@ -24,6 +25,7 @@ import { EventService } from '@/eventbus/event.service'; export class ExecutionRecoveryService { constructor( private readonly logger: Logger, + private readonly instanceSettings: InstanceSettings, private readonly push: Push, private readonly executionRepository: ExecutionRepository, private readonly orchestrationService: OrchestrationService, @@ -36,10 +38,10 @@ export class ExecutionRecoveryService { init() { if (config.getEnv('executions.mode') === 'regular') return; - const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService; - + const { isLeader } = this.instanceSettings; if (isLeader) this.scheduleQueueRecovery(); + const { isMultiMainSetupEnabled } = this.orchestrationService; if (isMultiMainSetupEnabled) { this.orchestrationService.multiMainSetup .on('leader-takeover', () => this.scheduleQueueRecovery()) @@ -58,7 +60,7 @@ export class ExecutionRecoveryService { * Recover key properties of a truncated execution using event logs. */ async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { - if (this.orchestrationService.isFollower) return; + if (this.instanceSettings.isFollower) return; const amendedExecution = await this.amend(executionId, messages); @@ -319,7 +321,7 @@ export class ExecutionRecoveryService { private shouldScheduleQueueRecovery() { return ( config.getEnv('executions.mode') === 'queue' && - config.getEnv('instanceRole') === 'leader' && + this.instanceSettings.isLeader && !this.isShuttingDown ); } diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index bfc1d140fc..283470f4d2 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -7,11 +7,13 @@ import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/Redis import { RedisService } from './redis.service'; import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee'; import type { WorkflowActivateMode } from 'n8n-workflow'; +import { InstanceSettings } from 'n8n-core'; @Service() export class OrchestrationService { constructor( private readonly logger: Logger, + private readonly instanceSettings: InstanceSettings, private readonly redisService: RedisService, readonly multiMainSetup: MultiMainSetup, ) {} @@ -43,12 +45,14 @@ export class OrchestrationService { return config.getEnv('redis.queueModeId'); } + /** @deprecated use InstanceSettings.isLeader */ get isLeader() { - return config.getEnv('instanceRole') === 'leader'; + return this.instanceSettings.isLeader; } + /** @deprecated use InstanceSettings.isFollower */ get isFollower() { - return config.getEnv('instanceRole') !== 'leader'; + return this.instanceSettings.isFollower; } sanityCheck() { @@ -63,7 +67,7 @@ export class OrchestrationService { if (this.isMultiMainSetupEnabled) { await this.multiMainSetup.init(); } else { - config.set('instanceRole', 'leader'); + this.instanceSettings.markAsLeader(); } this.isInitialized = true; diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index cabcf5996b..89c6ef725a 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -1,6 +1,7 @@ import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; +import { InstanceSettings } from 'n8n-core'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; import { Logger } from '@/Logger'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; @@ -16,6 +17,7 @@ type MultiMainEvents = { export class MultiMainSetup extends TypedEmitter { constructor( private readonly logger: Logger, + private readonly instanceSettings: InstanceSettings, private readonly redisPublisher: RedisServicePubSubPublisher, private readonly redisClientService: RedisClientService, ) { @@ -50,7 +52,7 @@ export class MultiMainSetup extends TypedEmitter { async shutdown() { clearInterval(this.leaderCheckInterval); - const isLeader = config.getEnv('instanceRole') === 'leader'; + const { isLeader } = this.instanceSettings; if (isLeader) await this.redisPublisher.clear(this.leaderKey); } @@ -69,8 +71,8 @@ export class MultiMainSetup extends TypedEmitter { if (leaderId && leaderId !== this.instanceId) { this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`); - if (config.getEnv('instanceRole') === 'leader') { - config.set('instanceRole', 'follower'); + if (this.instanceSettings.isLeader) { + this.instanceSettings.markAsFollower(); this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery @@ -85,7 +87,7 @@ export class MultiMainSetup extends TypedEmitter { `[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`, ); - config.set('instanceRole', 'follower'); + this.instanceSettings.markAsFollower(); /** * Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery @@ -106,7 +108,7 @@ export class MultiMainSetup extends TypedEmitter { if (keySetSuccessfully) { this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`); - config.set('instanceRole', 'leader'); + this.instanceSettings.markAsLeader(); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); @@ -115,7 +117,7 @@ export class MultiMainSetup extends TypedEmitter { */ this.emit('leader-takeover'); } else { - config.set('instanceRole', 'follower'); + this.instanceSettings.markAsFollower(); } } diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index 78c0aad037..7f824836ae 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -1,5 +1,5 @@ import { Service } from 'typedi'; -import { BinaryDataService } from 'n8n-core'; +import { BinaryDataService, InstanceSettings } from 'n8n-core'; import { inTest, TIME } from '@/constants'; import config from '@/config'; import { ExecutionRepository } from '@db/repositories/execution.repository'; @@ -25,6 +25,7 @@ export class PruningService { constructor( private readonly logger: Logger, + private readonly instanceSettings: InstanceSettings, private readonly executionRepository: ExecutionRepository, private readonly binaryDataService: BinaryDataService, private readonly orchestrationService: OrchestrationService, @@ -56,7 +57,7 @@ export class PruningService { if ( config.getEnv('multiMainSetup.enabled') && config.getEnv('generic.instanceType') === 'main' && - config.getEnv('instanceRole') === 'follower' + this.instanceSettings.isFollower ) { return false; } diff --git a/packages/cli/test/integration/pruning.service.test.ts b/packages/cli/test/integration/pruning.service.test.ts index 09a43a5b5b..37d218c09d 100644 --- a/packages/cli/test/integration/pruning.service.test.ts +++ b/packages/cli/test/integration/pruning.service.test.ts @@ -1,5 +1,5 @@ import config from '@/config'; -import { BinaryDataService } from 'n8n-core'; +import { BinaryDataService, InstanceSettings } from 'n8n-core'; import type { ExecutionStatus } from 'n8n-workflow'; import Container from 'typedi'; @@ -15,10 +15,11 @@ import { mockInstance } from '../shared/mocking'; import { createWorkflow } from './shared/db/workflows'; import { createExecution, createSuccessfulExecution } from './shared/db/executions'; import { mock } from 'jest-mock-extended'; -import type { OrchestrationService } from '@/services/orchestration.service'; describe('softDeleteOnPruningCycle()', () => { let pruningService: PruningService; + const instanceSettings = new InstanceSettings(); + instanceSettings.markAsLeader(); const now = new Date(); const yesterday = new Date(Date.now() - TIME.DAY); @@ -29,9 +30,10 @@ describe('softDeleteOnPruningCycle()', () => { pruningService = new PruningService( mockInstance(Logger), + instanceSettings, Container.get(ExecutionRepository), mockInstance(BinaryDataService), - mock(), + mock(), ); workflow = await createWorkflow(); diff --git a/packages/cli/test/unit/WaitTracker.test.ts b/packages/cli/test/unit/WaitTracker.test.ts index 0f8464e18d..fb51d2e25b 100644 --- a/packages/cli/test/unit/WaitTracker.test.ts +++ b/packages/cli/test/unit/WaitTracker.test.ts @@ -10,7 +10,7 @@ jest.useFakeTimers(); describe('WaitTracker', () => { const executionRepository = mock(); const multiMainSetup = mock(); - const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup); + const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup); const execution = mock({ id: '123', diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 75c7c06e40..0b63f74344 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -1,4 +1,9 @@ import Container from 'typedi'; +import type Redis from 'ioredis'; +import { mock } from 'jest-mock-extended'; +import { InstanceSettings } from 'n8n-core'; +import type { WorkflowActivateMode } from 'n8n-workflow'; + import config from '@/config'; import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; @@ -13,11 +18,9 @@ import { Logger } from '@/Logger'; import { Push } from '@/push'; import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { mockInstance } from '../../shared/mocking'; -import type { WorkflowActivateMode } from 'n8n-workflow'; import { RedisClientService } from '@/services/redis/redis-client.service'; -import type Redis from 'ioredis'; -import { mock } from 'jest-mock-extended'; +const instanceSettings = Container.get(InstanceSettings); const redisClientService = mockInstance(RedisClientService); const mockRedisClient = mock(); redisClientService.createClient.mockReturnValue(mockRedisClient); @@ -72,6 +75,10 @@ describe('Orchestration Service', () => { queueModeId = config.get('redis.queueModeId'); }); + beforeEach(() => { + instanceSettings.markAsLeader(); + }); + afterAll(async () => { jest.mock('@/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); jest.mock('@/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); @@ -141,13 +148,10 @@ describe('Orchestration Service', () => { ); expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2); expect(res1!.payload).toBeUndefined(); - expect((res2!.payload as { result: string }).result).toEqual('debounced'); + expect(res2!.payload).toEqual({ result: 'debounced' }); }); describe('shouldAddWebhooks', () => { - beforeEach(() => { - config.set('instanceRole', 'leader'); - }); test('should return true for init', () => { // We want to ensure that webhooks are populated on init // more https://github.com/n8n-io/n8n/pull/8830 @@ -169,7 +173,7 @@ describe('Orchestration Service', () => { }); test('should return false for update or activate when not leader', () => { - config.set('instanceRole', 'follower'); + instanceSettings.markAsFollower(); const modes = ['update', 'activate'] as WorkflowActivateMode[]; for (const mode of modes) { const result = os.shouldAddWebhooks(mode); diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index e8ab9aa553..f75e1df712 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -14,6 +14,8 @@ interface WritableSettings { type Settings = ReadOnlySettings & WritableSettings; +type InstanceRole = 'unset' | 'leader' | 'follower'; + const inTest = process.env.NODE_ENV === 'test'; @Service() @@ -38,6 +40,25 @@ export class InstanceSettings { readonly instanceId = this.generateInstanceId(); + /** Always `leader` in single-main setup. `leader` or `follower` in multi-main setup. */ + private instanceRole: InstanceRole = 'unset'; + + get isLeader() { + return this.instanceRole === 'leader'; + } + + markAsLeader() { + this.instanceRole = 'leader'; + } + + get isFollower() { + return this.instanceRole === 'follower'; + } + + markAsFollower() { + this.instanceRole = 'follower'; + } + get encryptionKey() { return this.settings.encryptionKey; }