refactor(core): Move instanceRole to InstanceSettings (no-changelog) (#10242)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-08-02 15:18:33 +02:00 committed by GitHub
parent 7056e50b00
commit 0faf46f4f8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 77 additions and 48 deletions

View file

@ -55,7 +55,7 @@ export class License {
* This ensures the mains do not cause a 429 (too many requests) on license init. * This ensures the mains do not cause a 429 (too many requests) on license init.
*/ */
if (config.getEnv('multiMainSetup.enabled')) { if (config.getEnv('multiMainSetup.enabled')) {
return autoRenewEnabled && config.getEnv('instanceRole') === 'leader'; return autoRenewEnabled && this.instanceSettings.isLeader;
} }
return autoRenewEnabled; return autoRenewEnabled;

View file

@ -186,7 +186,7 @@ export class Start extends BaseCommand {
await this.initOrchestration(); await this.initOrchestration();
this.logger.debug('Orchestration init complete'); this.logger.debug('Orchestration init complete');
if (!config.getEnv('license.autoRenewEnabled') && config.getEnv('instanceRole') === 'leader') { if (!config.getEnv('license.autoRenewEnabled') && this.instanceSettings.isLeader) {
this.logger.warn( this.logger.warn(
'Automatic license renewal is disabled. The license will not renew automatically, and access to licensed features may be lost!', 'Automatic license renewal is disabled. The license will not renew automatically, and access to licensed features may be lost!',
); );
@ -210,7 +210,7 @@ export class Start extends BaseCommand {
async initOrchestration() { async initOrchestration() {
if (config.getEnv('executions.mode') === 'regular') { if (config.getEnv('executions.mode') === 'regular') {
config.set('instanceRole', 'leader'); this.instanceSettings.markAsLeader();
return; return;
} }

View file

@ -759,12 +759,6 @@ export const schema = {
}, },
}, },
instanceRole: {
doc: 'Always `leader` in single-main setup. `leader` or `follower` in multi-main setup.',
format: ['unset', 'leader', 'follower'] as const,
default: 'unset', // only until Start.initOrchestration
},
multiMainSetup: { multiMainSetup: {
enabled: { enabled: {
doc: 'Whether to enable multi-main setup for queue mode (license required)', doc: 'Whether to enable multi-main setup for queue mode (license required)',

View file

@ -1,6 +1,7 @@
import Container from 'typedi'; import Container from 'typedi';
import { stringify } from 'flatted'; import { stringify } from 'flatted';
import { randomInt } from 'n8n-workflow'; import { randomInt } from 'n8n-workflow';
import { InstanceSettings } from 'n8n-core';
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
import { createWorkflow } from '@test-integration/db/workflows'; import { createWorkflow } from '@test-integration/db/workflows';
@ -21,38 +22,37 @@ import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNod
import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants'; import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants';
import { setupMessages } from './utils'; import { setupMessages } from './utils';
import type { EventService } from '@/eventbus/event.service';
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
import type { Logger } from '@/Logger';
describe('ExecutionRecoveryService', () => { describe('ExecutionRecoveryService', () => {
let push: Push; const push = mockInstance(Push);
mockInstance(InternalHooks);
const instanceSettings = new InstanceSettings();
let executionRecoveryService: ExecutionRecoveryService; let executionRecoveryService: ExecutionRecoveryService;
let orchestrationService: OrchestrationService; let orchestrationService: OrchestrationService;
let executionRepository: ExecutionRepository; let executionRepository: ExecutionRepository;
beforeAll(async () => { beforeAll(async () => {
await testDb.init(); await testDb.init();
push = mockInstance(Push);
executionRepository = Container.get(ExecutionRepository); executionRepository = Container.get(ExecutionRepository);
orchestrationService = Container.get(OrchestrationService); orchestrationService = Container.get(OrchestrationService);
mockInstance(InternalHooks);
executionRecoveryService = new ExecutionRecoveryService( executionRecoveryService = new ExecutionRecoveryService(
mock<Logger>(), mock(),
instanceSettings,
push, push,
executionRepository, executionRepository,
orchestrationService, orchestrationService,
mock<EventService>(), mock(),
); );
}); });
beforeEach(() => { beforeEach(() => {
config.set('instanceRole', 'leader'); instanceSettings.markAsLeader();
}); });
afterEach(async () => { afterEach(async () => {
config.load(config.default);
jest.restoreAllMocks(); jest.restoreAllMocks();
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']); await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
executionRecoveryService.shutdown(); executionRecoveryService.shutdown();
@ -69,7 +69,6 @@ describe('ExecutionRecoveryService', () => {
* Arrange * Arrange
*/ */
config.set('executions.mode', 'queue'); config.set('executions.mode', 'queue');
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');
/** /**
@ -88,7 +87,7 @@ describe('ExecutionRecoveryService', () => {
* Arrange * Arrange
*/ */
config.set('executions.mode', 'queue'); config.set('executions.mode', 'queue');
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false); instanceSettings.markAsFollower();
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');
/** /**
@ -130,7 +129,7 @@ describe('ExecutionRecoveryService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('instanceRole', 'follower'); instanceSettings.markAsFollower();
// @ts-expect-error Private method // @ts-expect-error Private method
const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); const amendSpy = jest.spyOn(executionRecoveryService, 'amend');
const messages = setupMessages('123', 'Some workflow'); const messages = setupMessages('123', 'Some workflow');

View file

@ -5,6 +5,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
import type { DateTime } from 'luxon'; import type { DateTime } from 'luxon';
import type { IRun, ITaskData } from 'n8n-workflow'; import type { IRun, ITaskData } from 'n8n-workflow';
import { InstanceSettings } from 'n8n-core';
import type { EventMessageTypes } from '../eventbus/EventMessageClasses'; import type { EventMessageTypes } from '../eventbus/EventMessageClasses';
import type { IExecutionResponse } from '@/Interfaces'; import type { IExecutionResponse } from '@/Interfaces';
import { NodeCrashedError } from '@/errors/node-crashed.error'; import { NodeCrashedError } from '@/errors/node-crashed.error';
@ -24,6 +25,7 @@ import { EventService } from '@/eventbus/event.service';
export class ExecutionRecoveryService { export class ExecutionRecoveryService {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly push: Push, private readonly push: Push,
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
@ -36,10 +38,10 @@ export class ExecutionRecoveryService {
init() { init() {
if (config.getEnv('executions.mode') === 'regular') return; if (config.getEnv('executions.mode') === 'regular') return;
const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService; const { isLeader } = this.instanceSettings;
if (isLeader) this.scheduleQueueRecovery(); if (isLeader) this.scheduleQueueRecovery();
const { isMultiMainSetupEnabled } = this.orchestrationService;
if (isMultiMainSetupEnabled) { if (isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup this.orchestrationService.multiMainSetup
.on('leader-takeover', () => this.scheduleQueueRecovery()) .on('leader-takeover', () => this.scheduleQueueRecovery())
@ -58,7 +60,7 @@ export class ExecutionRecoveryService {
* Recover key properties of a truncated execution using event logs. * Recover key properties of a truncated execution using event logs.
*/ */
async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) {
if (this.orchestrationService.isFollower) return; if (this.instanceSettings.isFollower) return;
const amendedExecution = await this.amend(executionId, messages); const amendedExecution = await this.amend(executionId, messages);
@ -319,7 +321,7 @@ export class ExecutionRecoveryService {
private shouldScheduleQueueRecovery() { private shouldScheduleQueueRecovery() {
return ( return (
config.getEnv('executions.mode') === 'queue' && config.getEnv('executions.mode') === 'queue' &&
config.getEnv('instanceRole') === 'leader' && this.instanceSettings.isLeader &&
!this.isShuttingDown !this.isShuttingDown
); );
} }

View file

@ -7,11 +7,13 @@ import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/Redis
import { RedisService } from './redis.service'; import { RedisService } from './redis.service';
import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee'; import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee';
import type { WorkflowActivateMode } from 'n8n-workflow'; import type { WorkflowActivateMode } from 'n8n-workflow';
import { InstanceSettings } from 'n8n-core';
@Service() @Service()
export class OrchestrationService { export class OrchestrationService {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly redisService: RedisService, private readonly redisService: RedisService,
readonly multiMainSetup: MultiMainSetup, readonly multiMainSetup: MultiMainSetup,
) {} ) {}
@ -43,12 +45,14 @@ export class OrchestrationService {
return config.getEnv('redis.queueModeId'); return config.getEnv('redis.queueModeId');
} }
/** @deprecated use InstanceSettings.isLeader */
get isLeader() { get isLeader() {
return config.getEnv('instanceRole') === 'leader'; return this.instanceSettings.isLeader;
} }
/** @deprecated use InstanceSettings.isFollower */
get isFollower() { get isFollower() {
return config.getEnv('instanceRole') !== 'leader'; return this.instanceSettings.isFollower;
} }
sanityCheck() { sanityCheck() {
@ -63,7 +67,7 @@ export class OrchestrationService {
if (this.isMultiMainSetupEnabled) { if (this.isMultiMainSetupEnabled) {
await this.multiMainSetup.init(); await this.multiMainSetup.init();
} else { } else {
config.set('instanceRole', 'leader'); this.instanceSettings.markAsLeader();
} }
this.isInitialized = true; this.isInitialized = true;

View file

@ -1,6 +1,7 @@
import config from '@/config'; import config from '@/config';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { TIME } from '@/constants'; import { TIME } from '@/constants';
import { InstanceSettings } from 'n8n-core';
import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
@ -16,6 +17,7 @@ type MultiMainEvents = {
export class MultiMainSetup extends TypedEmitter<MultiMainEvents> { export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly redisPublisher: RedisServicePubSubPublisher, private readonly redisPublisher: RedisServicePubSubPublisher,
private readonly redisClientService: RedisClientService, private readonly redisClientService: RedisClientService,
) { ) {
@ -50,7 +52,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
async shutdown() { async shutdown() {
clearInterval(this.leaderCheckInterval); clearInterval(this.leaderCheckInterval);
const isLeader = config.getEnv('instanceRole') === 'leader'; const { isLeader } = this.instanceSettings;
if (isLeader) await this.redisPublisher.clear(this.leaderKey); if (isLeader) await this.redisPublisher.clear(this.leaderKey);
} }
@ -69,8 +71,8 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
if (leaderId && leaderId !== this.instanceId) { if (leaderId && leaderId !== this.instanceId) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`); this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`);
if (config.getEnv('instanceRole') === 'leader') { if (this.instanceSettings.isLeader) {
config.set('instanceRole', 'follower'); this.instanceSettings.markAsFollower();
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery
@ -85,7 +87,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
`[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`, `[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`,
); );
config.set('instanceRole', 'follower'); this.instanceSettings.markAsFollower();
/** /**
* Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery * Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery
@ -106,7 +108,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
if (keySetSuccessfully) { if (keySetSuccessfully) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`); this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`);
config.set('instanceRole', 'leader'); this.instanceSettings.markAsLeader();
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
@ -115,7 +117,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
*/ */
this.emit('leader-takeover'); this.emit('leader-takeover');
} else { } else {
config.set('instanceRole', 'follower'); this.instanceSettings.markAsFollower();
} }
} }

View file

@ -1,5 +1,5 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core'; import { BinaryDataService, InstanceSettings } from 'n8n-core';
import { inTest, TIME } from '@/constants'; import { inTest, TIME } from '@/constants';
import config from '@/config'; import config from '@/config';
import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository';
@ -25,6 +25,7 @@ export class PruningService {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService, private readonly binaryDataService: BinaryDataService,
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
@ -56,7 +57,7 @@ export class PruningService {
if ( if (
config.getEnv('multiMainSetup.enabled') && config.getEnv('multiMainSetup.enabled') &&
config.getEnv('generic.instanceType') === 'main' && config.getEnv('generic.instanceType') === 'main' &&
config.getEnv('instanceRole') === 'follower' this.instanceSettings.isFollower
) { ) {
return false; return false;
} }

View file

@ -1,5 +1,5 @@
import config from '@/config'; import config from '@/config';
import { BinaryDataService } from 'n8n-core'; import { BinaryDataService, InstanceSettings } from 'n8n-core';
import type { ExecutionStatus } from 'n8n-workflow'; import type { ExecutionStatus } from 'n8n-workflow';
import Container from 'typedi'; import Container from 'typedi';
@ -15,10 +15,11 @@ import { mockInstance } from '../shared/mocking';
import { createWorkflow } from './shared/db/workflows'; import { createWorkflow } from './shared/db/workflows';
import { createExecution, createSuccessfulExecution } from './shared/db/executions'; import { createExecution, createSuccessfulExecution } from './shared/db/executions';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { OrchestrationService } from '@/services/orchestration.service';
describe('softDeleteOnPruningCycle()', () => { describe('softDeleteOnPruningCycle()', () => {
let pruningService: PruningService; let pruningService: PruningService;
const instanceSettings = new InstanceSettings();
instanceSettings.markAsLeader();
const now = new Date(); const now = new Date();
const yesterday = new Date(Date.now() - TIME.DAY); const yesterday = new Date(Date.now() - TIME.DAY);
@ -29,9 +30,10 @@ describe('softDeleteOnPruningCycle()', () => {
pruningService = new PruningService( pruningService = new PruningService(
mockInstance(Logger), mockInstance(Logger),
instanceSettings,
Container.get(ExecutionRepository), Container.get(ExecutionRepository),
mockInstance(BinaryDataService), mockInstance(BinaryDataService),
mock<OrchestrationService>(), mock(),
); );
workflow = await createWorkflow(); workflow = await createWorkflow();

View file

@ -10,7 +10,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => { describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>(); const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>(); const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup); const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup);
const execution = mock<IExecutionResponse>({ const execution = mock<IExecutionResponse>({
id: '123', id: '123',

View file

@ -1,4 +1,9 @@
import Container from 'typedi'; import Container from 'typedi';
import type Redis from 'ioredis';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import type { WorkflowActivateMode } from 'n8n-workflow';
import config from '@/config'; import config from '@/config';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
@ -13,11 +18,9 @@ import { Logger } from '@/Logger';
import { Push } from '@/push'; import { Push } from '@/push';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { mockInstance } from '../../shared/mocking'; import { mockInstance } from '../../shared/mocking';
import type { WorkflowActivateMode } from 'n8n-workflow';
import { RedisClientService } from '@/services/redis/redis-client.service'; import { RedisClientService } from '@/services/redis/redis-client.service';
import type Redis from 'ioredis';
import { mock } from 'jest-mock-extended';
const instanceSettings = Container.get(InstanceSettings);
const redisClientService = mockInstance(RedisClientService); const redisClientService = mockInstance(RedisClientService);
const mockRedisClient = mock<Redis>(); const mockRedisClient = mock<Redis>();
redisClientService.createClient.mockReturnValue(mockRedisClient); redisClientService.createClient.mockReturnValue(mockRedisClient);
@ -72,6 +75,10 @@ describe('Orchestration Service', () => {
queueModeId = config.get('redis.queueModeId'); queueModeId = config.get('redis.queueModeId');
}); });
beforeEach(() => {
instanceSettings.markAsLeader();
});
afterAll(async () => { afterAll(async () => {
jest.mock('@/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); jest.mock('@/services/redis/RedisServicePubSubPublisher').restoreAllMocks();
jest.mock('@/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); jest.mock('@/services/redis/RedisServicePubSubSubscriber').restoreAllMocks();
@ -141,13 +148,10 @@ describe('Orchestration Service', () => {
); );
expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2); expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2);
expect(res1!.payload).toBeUndefined(); expect(res1!.payload).toBeUndefined();
expect((res2!.payload as { result: string }).result).toEqual('debounced'); expect(res2!.payload).toEqual({ result: 'debounced' });
}); });
describe('shouldAddWebhooks', () => { describe('shouldAddWebhooks', () => {
beforeEach(() => {
config.set('instanceRole', 'leader');
});
test('should return true for init', () => { test('should return true for init', () => {
// We want to ensure that webhooks are populated on init // We want to ensure that webhooks are populated on init
// more https://github.com/n8n-io/n8n/pull/8830 // more https://github.com/n8n-io/n8n/pull/8830
@ -169,7 +173,7 @@ describe('Orchestration Service', () => {
}); });
test('should return false for update or activate when not leader', () => { test('should return false for update or activate when not leader', () => {
config.set('instanceRole', 'follower'); instanceSettings.markAsFollower();
const modes = ['update', 'activate'] as WorkflowActivateMode[]; const modes = ['update', 'activate'] as WorkflowActivateMode[];
for (const mode of modes) { for (const mode of modes) {
const result = os.shouldAddWebhooks(mode); const result = os.shouldAddWebhooks(mode);

View file

@ -14,6 +14,8 @@ interface WritableSettings {
type Settings = ReadOnlySettings & WritableSettings; type Settings = ReadOnlySettings & WritableSettings;
type InstanceRole = 'unset' | 'leader' | 'follower';
const inTest = process.env.NODE_ENV === 'test'; const inTest = process.env.NODE_ENV === 'test';
@Service() @Service()
@ -38,6 +40,25 @@ export class InstanceSettings {
readonly instanceId = this.generateInstanceId(); readonly instanceId = this.generateInstanceId();
/** Always `leader` in single-main setup. `leader` or `follower` in multi-main setup. */
private instanceRole: InstanceRole = 'unset';
get isLeader() {
return this.instanceRole === 'leader';
}
markAsLeader() {
this.instanceRole = 'leader';
}
get isFollower() {
return this.instanceRole === 'follower';
}
markAsFollower() {
this.instanceRole = 'follower';
}
get encryptionKey() { get encryptionKey() {
return this.settings.encryptionKey; return this.settings.encryptionKey;
} }