mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-28 22:19:41 -08:00
refactor(core): Move multi-main state to InstanceSettings
(#12144)
This commit is contained in:
parent
77e2c75ca6
commit
28f1f6b561
|
@ -38,7 +38,7 @@ describe('License', () => {
|
||||||
license: licenseConfig,
|
license: licenseConfig,
|
||||||
multiMainSetup: { enabled: false },
|
multiMainSetup: { enabled: false },
|
||||||
});
|
});
|
||||||
license = new License(mockLogger(), instanceSettings, mock(), mock(), mock(), globalConfig);
|
license = new License(mockLogger(), instanceSettings, mock(), mock(), globalConfig);
|
||||||
await license.init();
|
await license.init();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -70,7 +70,6 @@ describe('License', () => {
|
||||||
mock<InstanceSettings>({ instanceType: 'worker' }),
|
mock<InstanceSettings>({ instanceType: 'worker' }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
|
||||||
mock<GlobalConfig>({ license: licenseConfig }),
|
mock<GlobalConfig>({ license: licenseConfig }),
|
||||||
);
|
);
|
||||||
await license.init();
|
await license.init();
|
||||||
|
@ -211,7 +210,6 @@ describe('License', () => {
|
||||||
mock<InstanceSettings>({ instanceType: 'main' }),
|
mock<InstanceSettings>({ instanceType: 'main' }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
|
||||||
globalConfig,
|
globalConfig,
|
||||||
).init();
|
).init();
|
||||||
|
|
||||||
|
@ -229,7 +227,6 @@ describe('License', () => {
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
|
||||||
).init();
|
).init();
|
||||||
|
|
||||||
expect(LicenseManager).toHaveBeenCalledWith(
|
expect(LicenseManager).toHaveBeenCalledWith(
|
||||||
|
@ -250,7 +247,7 @@ describe('License', () => {
|
||||||
});
|
});
|
||||||
config.set('multiMainSetup.instanceType', status);
|
config.set('multiMainSetup.instanceType', status);
|
||||||
|
|
||||||
await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
|
await new License(mockLogger(), mock(), mock(), mock(), globalConfig).init();
|
||||||
|
|
||||||
expect(LicenseManager).toHaveBeenCalledWith(
|
expect(LicenseManager).toHaveBeenCalledWith(
|
||||||
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
|
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
|
||||||
|
@ -267,7 +264,7 @@ describe('License', () => {
|
||||||
});
|
});
|
||||||
config.set('multiMainSetup.instanceType', status);
|
config.set('multiMainSetup.instanceType', status);
|
||||||
|
|
||||||
await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
|
await new License(mockLogger(), mock(), mock(), mock(), globalConfig).init();
|
||||||
|
|
||||||
expect(LicenseManager).toHaveBeenCalledWith(
|
expect(LicenseManager).toHaveBeenCalledWith(
|
||||||
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
|
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
|
||||||
|
@ -281,7 +278,7 @@ describe('License', () => {
|
||||||
});
|
});
|
||||||
config.set('multiMainSetup.instanceType', 'leader');
|
config.set('multiMainSetup.instanceType', 'leader');
|
||||||
|
|
||||||
await new License(mockLogger(), mock(), mock(), mock(), mock(), globalConfig).init();
|
await new License(mockLogger(), mock(), mock(), mock(), globalConfig).init();
|
||||||
|
|
||||||
expect(LicenseManager).toHaveBeenCalledWith(
|
expect(LicenseManager).toHaveBeenCalledWith(
|
||||||
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
|
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
|
||||||
|
@ -293,7 +290,7 @@ describe('License', () => {
|
||||||
|
|
||||||
describe('reinit', () => {
|
describe('reinit', () => {
|
||||||
it('should reinitialize license manager', async () => {
|
it('should reinitialize license manager', async () => {
|
||||||
const license = new License(mockLogger(), mock(), mock(), mock(), mock(), mock());
|
const license = new License(mockLogger(), mock(), mock(), mock(), mock());
|
||||||
await license.init();
|
await license.init();
|
||||||
|
|
||||||
const initSpy = jest.spyOn(license, 'init');
|
const initSpy = jest.spyOn(license, 'init');
|
||||||
|
|
|
@ -23,7 +23,7 @@ describe('WaitTracker', () => {
|
||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
const multiMainSetup = mock<MultiMainSetup>();
|
const multiMainSetup = mock<MultiMainSetup>();
|
||||||
const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock());
|
const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock());
|
||||||
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
|
const instanceSettings = mock<InstanceSettings>({ isLeader: true, isMultiMain: false });
|
||||||
|
|
||||||
const project = mock<Project>({ id: 'projectId' });
|
const project = mock<Project>({ id: 'projectId' });
|
||||||
const execution = mock<IExecutionResponse>({
|
const execution = mock<IExecutionResponse>({
|
||||||
|
@ -221,8 +221,6 @@ describe('WaitTracker', () => {
|
||||||
|
|
||||||
describe('multi-main setup', () => {
|
describe('multi-main setup', () => {
|
||||||
it('should start tracking if leader', () => {
|
it('should start tracking if leader', () => {
|
||||||
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
waitTracker.init();
|
waitTracker.init();
|
||||||
|
@ -238,9 +236,8 @@ describe('WaitTracker', () => {
|
||||||
activeExecutions,
|
activeExecutions,
|
||||||
workflowRunner,
|
workflowRunner,
|
||||||
orchestrationService,
|
orchestrationService,
|
||||||
mock<InstanceSettings>({ isLeader: false }),
|
mock<InstanceSettings>({ isLeader: false, isMultiMain: false }),
|
||||||
);
|
);
|
||||||
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
|
|
|
@ -511,7 +511,7 @@ export class ActiveWorkflowManager {
|
||||||
existingWorkflow?: WorkflowEntity,
|
existingWorkflow?: WorkflowEntity,
|
||||||
{ shouldPublish } = { shouldPublish: true },
|
{ shouldPublish } = { shouldPublish: true },
|
||||||
) {
|
) {
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled && shouldPublish) {
|
if (this.instanceSettings.isMultiMain && shouldPublish) {
|
||||||
void this.publisher.publishCommand({
|
void this.publisher.publishCommand({
|
||||||
command: 'add-webhooks-triggers-and-pollers',
|
command: 'add-webhooks-triggers-and-pollers',
|
||||||
payload: { workflowId },
|
payload: { workflowId },
|
||||||
|
@ -703,7 +703,7 @@ export class ActiveWorkflowManager {
|
||||||
// TODO: this should happen in a transaction
|
// TODO: this should happen in a transaction
|
||||||
// maybe, see: https://github.com/n8n-io/n8n/pull/8904#discussion_r1530150510
|
// maybe, see: https://github.com/n8n-io/n8n/pull/8904#discussion_r1530150510
|
||||||
async remove(workflowId: string) {
|
async remove(workflowId: string) {
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled) {
|
if (this.instanceSettings.isMultiMain) {
|
||||||
try {
|
try {
|
||||||
await this.clearWebhooks(workflowId);
|
await this.clearWebhooks(workflowId);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|
|
@ -100,7 +100,7 @@ export class Start extends BaseCommand {
|
||||||
|
|
||||||
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
|
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
|
|
||||||
if (Container.get(OrchestrationService).isMultiMainSetupEnabled) {
|
if (this.instanceSettings.isMultiMain) {
|
||||||
await Container.get(OrchestrationService).shutdown();
|
await Container.get(OrchestrationService).shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,6 +192,9 @@ export class Start extends BaseCommand {
|
||||||
await super.init();
|
await super.init();
|
||||||
this.activeWorkflowManager = Container.get(ActiveWorkflowManager);
|
this.activeWorkflowManager = Container.get(ActiveWorkflowManager);
|
||||||
|
|
||||||
|
this.instanceSettings.setMultiMainEnabled(
|
||||||
|
config.getEnv('executions.mode') === 'queue' && this.globalConfig.multiMainSetup.enabled,
|
||||||
|
);
|
||||||
await this.initLicense();
|
await this.initLicense();
|
||||||
|
|
||||||
await this.initOrchestration();
|
await this.initOrchestration();
|
||||||
|
@ -253,7 +256,7 @@ export class Start extends BaseCommand {
|
||||||
|
|
||||||
this.logger.scoped(['scaling', 'pubsub']).debug('Pubsub setup completed');
|
this.logger.scoped(['scaling', 'pubsub']).debug('Pubsub setup completed');
|
||||||
|
|
||||||
if (!orchestrationService.isMultiMainSetupEnabled) return;
|
if (this.instanceSettings.isSingleMain) return;
|
||||||
|
|
||||||
orchestrationService.multiMainSetup
|
orchestrationService.multiMainSetup
|
||||||
.on('leader-stepdown', async () => {
|
.on('leader-stepdown', async () => {
|
||||||
|
|
|
@ -9,7 +9,6 @@ import { SettingsRepository } from '@/databases/repositories/settings.repository
|
||||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||||
import { Logger } from '@/logging/logger.service';
|
import { Logger } from '@/logging/logger.service';
|
||||||
import { LicenseMetricsService } from '@/metrics/license-metrics.service';
|
import { LicenseMetricsService } from '@/metrics/license-metrics.service';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
import {
|
import {
|
||||||
LICENSE_FEATURES,
|
LICENSE_FEATURES,
|
||||||
|
@ -35,7 +34,6 @@ export class License {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly instanceSettings: InstanceSettings,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
|
||||||
private readonly settingsRepository: SettingsRepository,
|
private readonly settingsRepository: SettingsRepository,
|
||||||
private readonly licenseMetricsService: LicenseMetricsService,
|
private readonly licenseMetricsService: LicenseMetricsService,
|
||||||
private readonly globalConfig: GlobalConfig,
|
private readonly globalConfig: GlobalConfig,
|
||||||
|
@ -138,22 +136,23 @@ export class License {
|
||||||
this.logger.debug('License feature change detected', _features);
|
this.logger.debug('License feature change detected', _features);
|
||||||
|
|
||||||
if (config.getEnv('executions.mode') === 'queue' && this.globalConfig.multiMainSetup.enabled) {
|
if (config.getEnv('executions.mode') === 'queue' && this.globalConfig.multiMainSetup.enabled) {
|
||||||
const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as
|
const isMultiMainLicensed =
|
||||||
| boolean
|
(_features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as boolean | undefined) ?? false;
|
||||||
| undefined;
|
|
||||||
|
|
||||||
this.orchestrationService.setMultiMainSetupLicensed(isMultiMainLicensed ?? false);
|
this.instanceSettings.setMultiMainLicensed(isMultiMainLicensed);
|
||||||
|
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled && this.instanceSettings.isFollower) {
|
if (this.instanceSettings.isMultiMain && !this.instanceSettings.isLeader) {
|
||||||
this.logger.debug(
|
this.logger
|
||||||
'[Multi-main setup] Instance is follower, skipping sending of "reload-license" command...',
|
.scoped(['scaling', 'multi-main-setup', 'license'])
|
||||||
);
|
.debug('Instance is not leader, skipping sending of "reload-license" command...');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled && !isMultiMainLicensed) {
|
if (this.globalConfig.multiMainSetup.enabled && !isMultiMainLicensed) {
|
||||||
this.logger.debug(
|
this.logger
|
||||||
'[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supports this feature.',
|
.scoped(['scaling', 'multi-main-setup', 'license'])
|
||||||
|
.debug(
|
||||||
|
'License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supports this feature.',
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ import type { PushPayload, PushType } from '@n8n/api-types';
|
||||||
import type { Application } from 'express';
|
import type { Application } from 'express';
|
||||||
import { ServerResponse } from 'http';
|
import { ServerResponse } from 'http';
|
||||||
import type { Server } from 'http';
|
import type { Server } from 'http';
|
||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import type { Socket } from 'net';
|
import type { Socket } from 'net';
|
||||||
import { Container, Service } from 'typedi';
|
import { Container, Service } from 'typedi';
|
||||||
import { parse as parseUrl } from 'url';
|
import { parse as parseUrl } from 'url';
|
||||||
|
@ -13,7 +14,6 @@ import type { User } from '@/databases/entities/user';
|
||||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||||
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
import { TypedEmitter } from '@/typed-emitter';
|
import { TypedEmitter } from '@/typed-emitter';
|
||||||
|
|
||||||
import { SSEPush } from './sse.push';
|
import { SSEPush } from './sse.push';
|
||||||
|
@ -41,7 +41,7 @@ export class Push extends TypedEmitter<PushEvents> {
|
||||||
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
private readonly publisher: Publisher,
|
private readonly publisher: Publisher,
|
||||||
) {
|
) {
|
||||||
super();
|
super();
|
||||||
|
@ -92,7 +92,7 @@ export class Push extends TypedEmitter<PushEvents> {
|
||||||
* the webhook. If so, the handler process commands the creator process to
|
* the webhook. If so, the handler process commands the creator process to
|
||||||
* relay the former's execution lifecycle events to the creator's frontend.
|
* relay the former's execution lifecycle events to the creator's frontend.
|
||||||
*/
|
*/
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled && !this.backend.hasPushRef(pushRef)) {
|
if (this.instanceSettings.isMultiMain && !this.backend.hasPushRef(pushRef)) {
|
||||||
void this.publisher.publishCommand({
|
void this.publisher.publishCommand({
|
||||||
command: 'relay-execution-lifecycle-event',
|
command: 'relay-execution-lifecycle-event',
|
||||||
payload: { type, args: data, pushRef },
|
payload: { type, args: data, pushRef },
|
||||||
|
|
|
@ -5,7 +5,6 @@ import { InstanceSettings } from 'n8n-core';
|
||||||
import { ApplicationError } from 'n8n-workflow';
|
import { ApplicationError } from 'n8n-workflow';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
|
|
||||||
import type { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
import { mockInstance, mockLogger } from '@test/mocking';
|
import { mockInstance, mockLogger } from '@test/mocking';
|
||||||
|
|
||||||
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
|
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
|
||||||
|
@ -47,7 +46,6 @@ describe('ScalingService', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
const instanceSettings = Container.get(InstanceSettings);
|
const instanceSettings = Container.get(InstanceSettings);
|
||||||
const orchestrationService = mock<OrchestrationService>({ isMultiMainSetupEnabled: false });
|
|
||||||
const jobProcessor = mock<JobProcessor>();
|
const jobProcessor = mock<JobProcessor>();
|
||||||
|
|
||||||
let scalingService: ScalingService;
|
let scalingService: ScalingService;
|
||||||
|
@ -82,7 +80,7 @@ describe('ScalingService', () => {
|
||||||
globalConfig,
|
globalConfig,
|
||||||
mock(),
|
mock(),
|
||||||
instanceSettings,
|
instanceSettings,
|
||||||
orchestrationService,
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -66,9 +66,11 @@ export class ScalingService {
|
||||||
|
|
||||||
this.registerListeners();
|
this.registerListeners();
|
||||||
|
|
||||||
if (this.instanceSettings.isLeader) this.scheduleQueueRecovery();
|
const { isLeader, isMultiMain } = this.instanceSettings;
|
||||||
|
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled) {
|
if (isLeader) this.scheduleQueueRecovery();
|
||||||
|
|
||||||
|
if (isMultiMain) {
|
||||||
this.orchestrationService.multiMainSetup
|
this.orchestrationService.multiMainSetup
|
||||||
.on('leader-takeover', () => this.scheduleQueueRecovery())
|
.on('leader-takeover', () => this.scheduleQueueRecovery())
|
||||||
.on('leader-stepdown', () => this.stopQueueRecovery());
|
.on('leader-stepdown', () => this.stopQueueRecovery());
|
||||||
|
@ -127,7 +129,7 @@ export class ScalingService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async stopMain() {
|
private async stopMain() {
|
||||||
if (this.orchestrationService.isSingleMainSetup) {
|
if (this.instanceSettings.isSingleMain) {
|
||||||
await this.queue.pause(true, true); // no more jobs will be picked up
|
await this.queue.pause(true, true); // no more jobs will be picked up
|
||||||
this.logger.debug('Queue paused');
|
this.logger.debug('Queue paused');
|
||||||
}
|
}
|
||||||
|
@ -373,7 +375,7 @@ export class ScalingService {
|
||||||
return (
|
return (
|
||||||
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
||||||
this.instanceSettings.instanceType === 'main' &&
|
this.instanceSettings.instanceType === 'main' &&
|
||||||
!this.orchestrationService.isMultiMainSetupEnabled
|
this.instanceSettings.isSingleMain
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,6 @@ import { setupPushServer, setupPushHandler, Push } from '@/push';
|
||||||
import type { APIRequest } from '@/requests';
|
import type { APIRequest } from '@/requests';
|
||||||
import * as ResponseHelper from '@/response-helper';
|
import * as ResponseHelper from '@/response-helper';
|
||||||
import type { FrontendService } from '@/services/frontend.service';
|
import type { FrontendService } from '@/services/frontend.service';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
import '@/controllers/active-workflows.controller';
|
import '@/controllers/active-workflows.controller';
|
||||||
import '@/controllers/annotation-tags.controller.ee';
|
import '@/controllers/annotation-tags.controller.ee';
|
||||||
|
@ -79,7 +78,6 @@ export class Server extends AbstractServer {
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly loadNodesAndCredentials: LoadNodesAndCredentials,
|
private readonly loadNodesAndCredentials: LoadNodesAndCredentials,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
|
||||||
private readonly postHogClient: PostHogClient,
|
private readonly postHogClient: PostHogClient,
|
||||||
private readonly eventService: EventService,
|
private readonly eventService: EventService,
|
||||||
private readonly instanceSettings: InstanceSettings,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
|
@ -111,7 +109,7 @@ export class Server extends AbstractServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async registerAdditionalControllers() {
|
private async registerAdditionalControllers() {
|
||||||
if (!inProduction && this.orchestrationService.isMultiMainSetupEnabled) {
|
if (!inProduction && this.instanceSettings.isMultiMain) {
|
||||||
await import('@/controllers/debug.controller');
|
await import('@/controllers/debug.controller');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,29 +22,6 @@ export class OrchestrationService {
|
||||||
|
|
||||||
isInitialized = false;
|
isInitialized = false;
|
||||||
|
|
||||||
private isMultiMainSetupLicensed = false;
|
|
||||||
|
|
||||||
setMultiMainSetupLicensed(newState: boolean) {
|
|
||||||
this.isMultiMainSetupLicensed = newState;
|
|
||||||
}
|
|
||||||
|
|
||||||
get isMultiMainSetupEnabled() {
|
|
||||||
return (
|
|
||||||
config.getEnv('executions.mode') === 'queue' &&
|
|
||||||
this.globalConfig.multiMainSetup.enabled &&
|
|
||||||
this.instanceSettings.instanceType === 'main' &&
|
|
||||||
this.isMultiMainSetupLicensed
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
get isSingleMainSetup() {
|
|
||||||
return !this.isMultiMainSetupEnabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
sanityCheck() {
|
|
||||||
return this.isInitialized && config.get('executions.mode') === 'queue';
|
|
||||||
}
|
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
if (this.isInitialized) return;
|
if (this.isInitialized) return;
|
||||||
|
|
||||||
|
@ -56,7 +33,7 @@ export class OrchestrationService {
|
||||||
this.subscriber = Container.get(Subscriber);
|
this.subscriber = Container.get(Subscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.isMultiMainSetupEnabled) {
|
if (this.instanceSettings.isMultiMain) {
|
||||||
await this.multiMainSetup.init();
|
await this.multiMainSetup.init();
|
||||||
} else {
|
} else {
|
||||||
this.instanceSettings.markAsLeader();
|
this.instanceSettings.markAsLeader();
|
||||||
|
@ -69,7 +46,7 @@ export class OrchestrationService {
|
||||||
async shutdown() {
|
async shutdown() {
|
||||||
if (!this.isInitialized) return;
|
if (!this.isInitialized) return;
|
||||||
|
|
||||||
if (this.isMultiMainSetupEnabled) await this.multiMainSetup.shutdown();
|
if (this.instanceSettings.isMultiMain) await this.multiMainSetup.shutdown();
|
||||||
|
|
||||||
this.publisher.shutdown();
|
this.publisher.shutdown();
|
||||||
this.subscriber.shutdown();
|
this.subscriber.shutdown();
|
||||||
|
|
|
@ -17,11 +17,10 @@ describe('PruningService', () => {
|
||||||
it('should start pruning on main instance that is the leader', () => {
|
it('should start pruning on main instance that is the leader', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true }),
|
mock<InstanceSettings>({ isLeader: true, isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock(),
|
mock(),
|
||||||
|
@ -36,11 +35,10 @@ describe('PruningService', () => {
|
||||||
it('should not start pruning on main instance that is a follower', () => {
|
it('should not start pruning on main instance that is a follower', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: false }),
|
mock<InstanceSettings>({ isLeader: false, isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock(),
|
mock(),
|
||||||
|
@ -55,11 +53,10 @@ describe('PruningService', () => {
|
||||||
it('should register leadership events if main on multi-main setup', () => {
|
it('should register leadership events if main on multi-main setup', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true }),
|
mock<InstanceSettings>({ isLeader: true, isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn() }),
|
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn() }),
|
||||||
}),
|
}),
|
||||||
mock(),
|
mock(),
|
||||||
|
@ -85,11 +82,10 @@ describe('PruningService', () => {
|
||||||
it('should return `true` based on config if leader main', () => {
|
it('should return `true` based on config if leader main', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock<ExecutionsConfig>({ pruneData: true }),
|
mock<ExecutionsConfig>({ pruneData: true }),
|
||||||
|
@ -101,11 +97,10 @@ describe('PruningService', () => {
|
||||||
it('should return `false` based on config if leader main', () => {
|
it('should return `false` based on config if leader main', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock<ExecutionsConfig>({ pruneData: false }),
|
mock<ExecutionsConfig>({ pruneData: false }),
|
||||||
|
@ -117,11 +112,10 @@ describe('PruningService', () => {
|
||||||
it('should return `false` if non-main even if config is enabled', () => {
|
it('should return `false` if non-main even if config is enabled', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker' }),
|
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock<ExecutionsConfig>({ pruneData: true }),
|
mock<ExecutionsConfig>({ pruneData: true }),
|
||||||
|
@ -133,11 +127,15 @@ describe('PruningService', () => {
|
||||||
it('should return `false` if follower main even if config is enabled', () => {
|
it('should return `false` if follower main even if config is enabled', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: false, isFollower: true, instanceType: 'main' }),
|
mock<InstanceSettings>({
|
||||||
|
isLeader: false,
|
||||||
|
isFollower: true,
|
||||||
|
instanceType: 'main',
|
||||||
|
isMultiMain: true,
|
||||||
|
}),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock<ExecutionsConfig>({ pruneData: true }),
|
mock<ExecutionsConfig>({ pruneData: true }),
|
||||||
|
@ -151,11 +149,10 @@ describe('PruningService', () => {
|
||||||
it('should not start pruning if service is disabled', () => {
|
it('should not start pruning if service is disabled', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock<ExecutionsConfig>({ pruneData: false }),
|
mock<ExecutionsConfig>({ pruneData: false }),
|
||||||
|
@ -179,11 +176,10 @@ describe('PruningService', () => {
|
||||||
it('should start pruning if service is enabled and DB is migrated', () => {
|
it('should start pruning if service is enabled and DB is migrated', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new PruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock<OrchestrationService>({
|
mock<OrchestrationService>({
|
||||||
isMultiMainSetupEnabled: true,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>(),
|
multiMainSetup: mock<MultiMainSetup>(),
|
||||||
}),
|
}),
|
||||||
mock<ExecutionsConfig>({ pruneData: true }),
|
mock<ExecutionsConfig>({ pruneData: true }),
|
||||||
|
|
|
@ -51,7 +51,7 @@ export class PruningService {
|
||||||
|
|
||||||
if (this.instanceSettings.isLeader) this.startPruning();
|
if (this.instanceSettings.isLeader) this.startPruning();
|
||||||
|
|
||||||
if (this.orchestrationService.isMultiMainSetupEnabled) {
|
if (this.instanceSettings.isMultiMain) {
|
||||||
this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning());
|
this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning());
|
||||||
this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning());
|
this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning());
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,12 +40,11 @@ export class WaitTracker {
|
||||||
* @important Requires `OrchestrationService` to be initialized.
|
* @important Requires `OrchestrationService` to be initialized.
|
||||||
*/
|
*/
|
||||||
init() {
|
init() {
|
||||||
const { isLeader } = this.instanceSettings;
|
const { isLeader, isMultiMain } = this.instanceSettings;
|
||||||
const { isMultiMainSetupEnabled } = this.orchestrationService;
|
|
||||||
|
|
||||||
if (isLeader) this.startTracking();
|
if (isLeader) this.startTracking();
|
||||||
|
|
||||||
if (isMultiMainSetupEnabled) {
|
if (isMultiMain) {
|
||||||
this.orchestrationService.multiMainSetup
|
this.orchestrationService.multiMainSetup
|
||||||
.on('leader-takeover', () => this.startTracking())
|
.on('leader-takeover', () => this.startTracking())
|
||||||
.on('leader-stepdown', () => this.stopTracking());
|
.on('leader-stepdown', () => this.stopTracking());
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import type { InstanceSettings } from 'n8n-core';
|
||||||
|
|
||||||
import type { CacheService } from '@/services/cache/cache.service';
|
import type { CacheService } from '@/services/cache/cache.service';
|
||||||
import type { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
|
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
|
||||||
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
|
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ describe('TestWebhookRegistrationsService', () => {
|
||||||
const cacheService = mock<CacheService>();
|
const cacheService = mock<CacheService>();
|
||||||
const registrations = new TestWebhookRegistrationsService(
|
const registrations = new TestWebhookRegistrationsService(
|
||||||
cacheService,
|
cacheService,
|
||||||
mock<OrchestrationService>({ isMultiMainSetupEnabled: false }),
|
mock<InstanceSettings>({ isMultiMain: false }),
|
||||||
);
|
);
|
||||||
|
|
||||||
const registration = mock<TestWebhookRegistration>({
|
const registration = mock<TestWebhookRegistration>({
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import type { IWebhookData } from 'n8n-workflow';
|
import type { IWebhookData } from 'n8n-workflow';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
|
|
||||||
import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants';
|
import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants';
|
||||||
import type { IWorkflowDb } from '@/interfaces';
|
import type { IWorkflowDb } from '@/interfaces';
|
||||||
import { CacheService } from '@/services/cache/cache.service';
|
import { CacheService } from '@/services/cache/cache.service';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
export type TestWebhookRegistration = {
|
export type TestWebhookRegistration = {
|
||||||
pushRef?: string;
|
pushRef?: string;
|
||||||
|
@ -17,7 +17,7 @@ export type TestWebhookRegistration = {
|
||||||
export class TestWebhookRegistrationsService {
|
export class TestWebhookRegistrationsService {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly cacheService: CacheService,
|
private readonly cacheService: CacheService,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
private readonly cacheKey = 'test-webhooks';
|
private readonly cacheKey = 'test-webhooks';
|
||||||
|
@ -27,7 +27,7 @@ export class TestWebhookRegistrationsService {
|
||||||
|
|
||||||
await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration });
|
await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration });
|
||||||
|
|
||||||
if (!this.orchestrationService.isMultiMainSetupEnabled) return;
|
if (this.instanceSettings.isSingleMain) return;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Multi-main setup: In a manual webhook execution, the main process that
|
* Multi-main setup: In a manual webhook execution, the main process that
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import type express from 'express';
|
import type express from 'express';
|
||||||
import * as NodeExecuteFunctions from 'n8n-core';
|
import * as NodeExecuteFunctions from 'n8n-core';
|
||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import { WebhookPathTakenError, Workflow } from 'n8n-workflow';
|
import { WebhookPathTakenError, Workflow } from 'n8n-workflow';
|
||||||
import type {
|
import type {
|
||||||
IWebhookData,
|
IWebhookData,
|
||||||
|
@ -17,7 +18,6 @@ import type { IWorkflowDb } from '@/interfaces';
|
||||||
import { NodeTypes } from '@/node-types';
|
import { NodeTypes } from '@/node-types';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
import { Publisher } from '@/scaling/pubsub/publisher.service';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
import { removeTrailingSlash } from '@/utils';
|
import { removeTrailingSlash } from '@/utils';
|
||||||
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
|
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
|
||||||
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
|
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
|
||||||
|
@ -42,7 +42,7 @@ export class TestWebhooks implements IWebhookManager {
|
||||||
private readonly push: Push,
|
private readonly push: Push,
|
||||||
private readonly nodeTypes: NodeTypes,
|
private readonly nodeTypes: NodeTypes,
|
||||||
private readonly registrations: TestWebhookRegistrationsService,
|
private readonly registrations: TestWebhookRegistrationsService,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
private readonly publisher: Publisher,
|
private readonly publisher: Publisher,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
@ -155,7 +155,7 @@ export class TestWebhooks implements IWebhookManager {
|
||||||
* the handler process commands the creator process to clear its test webhooks.
|
* the handler process commands the creator process to clear its test webhooks.
|
||||||
*/
|
*/
|
||||||
if (
|
if (
|
||||||
this.orchestrationService.isMultiMainSetupEnabled &&
|
this.instanceSettings.isMultiMain &&
|
||||||
pushRef &&
|
pushRef &&
|
||||||
!this.push.getBackend().hasPushRef(pushRef)
|
!this.push.getBackend().hasPushRef(pushRef)
|
||||||
) {
|
) {
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { BinaryDataService, UnrecognizedNodeTypeError, type DirectoryLoader } from 'n8n-core';
|
import {
|
||||||
|
BinaryDataService,
|
||||||
|
InstanceSettings,
|
||||||
|
UnrecognizedNodeTypeError,
|
||||||
|
type DirectoryLoader,
|
||||||
|
} from 'n8n-core';
|
||||||
import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials';
|
import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials';
|
||||||
import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials';
|
import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials';
|
||||||
import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node';
|
import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node';
|
||||||
|
@ -18,7 +23,6 @@ import { SettingsRepository } from '@/databases/repositories/settings.repository
|
||||||
import { ExecutionService } from '@/executions/execution.service';
|
import { ExecutionService } from '@/executions/execution.service';
|
||||||
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
|
|
||||||
import { mockInstance } from '../../../shared/mocking';
|
import { mockInstance } from '../../../shared/mocking';
|
||||||
|
|
||||||
|
@ -32,8 +36,8 @@ export { setupTestServer } from './test-server';
|
||||||
* Initialize node types.
|
* Initialize node types.
|
||||||
*/
|
*/
|
||||||
export async function initActiveWorkflowManager() {
|
export async function initActiveWorkflowManager() {
|
||||||
mockInstance(OrchestrationService, {
|
mockInstance(InstanceSettings, {
|
||||||
isMultiMainSetupEnabled: false,
|
isMultiMain: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
mockInstance(Push);
|
mockInstance(Push);
|
||||||
|
|
|
@ -86,6 +86,29 @@ export class InstanceSettings {
|
||||||
*/
|
*/
|
||||||
readonly hostId: string;
|
readonly hostId: string;
|
||||||
|
|
||||||
|
private isMultiMainEnabled = false;
|
||||||
|
|
||||||
|
private isMultiMainLicensed = false;
|
||||||
|
|
||||||
|
/** Set whether multi-main mode is enabled. Does not imply licensed status. */
|
||||||
|
setMultiMainEnabled(newState: boolean) {
|
||||||
|
this.isMultiMainEnabled = newState;
|
||||||
|
}
|
||||||
|
|
||||||
|
setMultiMainLicensed(newState: boolean) {
|
||||||
|
this.isMultiMainLicensed = newState;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Whether this `main` instance is running in multi-main mode. */
|
||||||
|
get isMultiMain() {
|
||||||
|
return this.instanceType === 'main' && this.isMultiMainEnabled && this.isMultiMainLicensed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Whether this `main` instance is running in single-main mode. */
|
||||||
|
get isSingleMain() {
|
||||||
|
return !this.isMultiMain;
|
||||||
|
}
|
||||||
|
|
||||||
get isLeader() {
|
get isLeader() {
|
||||||
return this.instanceRole === 'leader';
|
return this.instanceRole === 'leader';
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue