mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-09 22:24:05 -08:00
refactor(core): Move instanceType
to InstanceSettings
(no-changelog) (#10640)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Waiting to run
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Waiting to run
This commit is contained in:
parent
50beefb658
commit
25c8a328a8
|
@ -1,13 +1,11 @@
|
||||||
import { LicenseManager } from '@n8n_io/license-sdk';
|
import { LicenseManager } from '@n8n_io/license-sdk';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { InstanceSettings } from 'n8n-core';
|
import type { InstanceSettings } from 'n8n-core';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { N8N_VERSION } from '@/constants';
|
import { N8N_VERSION } from '@/constants';
|
||||||
import { License } from '@/license';
|
import { License } from '@/license';
|
||||||
import { Logger } from '@/logger';
|
import type { Logger } from '@/logger';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
import { mockInstance } from '@test/mocking';
|
|
||||||
|
|
||||||
jest.mock('@n8n_io/license-sdk');
|
jest.mock('@n8n_io/license-sdk');
|
||||||
|
|
||||||
|
@ -27,9 +25,11 @@ describe('License', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
let license: License;
|
let license: License;
|
||||||
const logger = mockInstance(Logger);
|
const logger = mock<Logger>();
|
||||||
const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID });
|
const instanceSettings = mock<InstanceSettings>({
|
||||||
mockInstance(OrchestrationService);
|
instanceId: MOCK_INSTANCE_ID,
|
||||||
|
instanceType: 'main',
|
||||||
|
});
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
||||||
|
@ -56,8 +56,14 @@ describe('License', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
test('initializes license manager for worker', async () => {
|
test('initializes license manager for worker', async () => {
|
||||||
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
license = new License(
|
||||||
await license.init('worker');
|
logger,
|
||||||
|
mock<InstanceSettings>({ instanceType: 'worker' }),
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
);
|
||||||
|
await license.init();
|
||||||
expect(LicenseManager).toHaveBeenCalledWith({
|
expect(LicenseManager).toHaveBeenCalledWith({
|
||||||
autoRenewEnabled: false,
|
autoRenewEnabled: false,
|
||||||
autoRenewOffset: MOCK_RENEW_OFFSET,
|
autoRenewOffset: MOCK_RENEW_OFFSET,
|
||||||
|
@ -265,7 +271,7 @@ describe('License', () => {
|
||||||
|
|
||||||
await license.reinit();
|
await license.reinit();
|
||||||
|
|
||||||
expect(initSpy).toHaveBeenCalledWith('main', true);
|
expect(initSpy).toHaveBeenCalledWith(true);
|
||||||
|
|
||||||
expect(LicenseManager.prototype.reset).toHaveBeenCalled();
|
expect(LicenseManager.prototype.reset).toHaveBeenCalled();
|
||||||
expect(LicenseManager.prototype.initialize).toHaveBeenCalled();
|
expect(LicenseManager.prototype.initialize).toHaveBeenCalled();
|
||||||
|
|
|
@ -5,6 +5,7 @@ import { engine as expressHandlebars } from 'express-handlebars';
|
||||||
import { readFile } from 'fs/promises';
|
import { readFile } from 'fs/promises';
|
||||||
import type { Server } from 'http';
|
import type { Server } from 'http';
|
||||||
import isbot from 'isbot';
|
import isbot from 'isbot';
|
||||||
|
import type { InstanceType } from 'n8n-core';
|
||||||
import { Container, Service } from 'typedi';
|
import { Container, Service } from 'typedi';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
|
@ -12,7 +13,6 @@ import { N8N_VERSION, TEMPLATES_DIR, inDevelopment, inTest } from '@/constants';
|
||||||
import * as Db from '@/db';
|
import * as Db from '@/db';
|
||||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||||
import { ExternalHooks } from '@/external-hooks';
|
import { ExternalHooks } from '@/external-hooks';
|
||||||
import { N8nInstanceType } from '@/interfaces';
|
|
||||||
import { Logger } from '@/logger';
|
import { Logger } from '@/logger';
|
||||||
import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
|
import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
|
||||||
import { send, sendErrorResponse } from '@/response-helper';
|
import { send, sendErrorResponse } from '@/response-helper';
|
||||||
|
@ -61,7 +61,7 @@ export abstract class AbstractServer {
|
||||||
|
|
||||||
readonly uniqueInstanceId: string;
|
readonly uniqueInstanceId: string;
|
||||||
|
|
||||||
constructor(instanceType: N8nInstanceType = 'main') {
|
constructor(instanceType: Exclude<InstanceType, 'worker'>) {
|
||||||
this.app = express();
|
this.app = express();
|
||||||
this.app.disable('x-powered-by');
|
this.app.disable('x-powered-by');
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,6 @@ import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
|
||||||
import { initExpressionEvaluator } from '@/expression-evaluator';
|
import { initExpressionEvaluator } from '@/expression-evaluator';
|
||||||
import { ExternalHooks } from '@/external-hooks';
|
import { ExternalHooks } from '@/external-hooks';
|
||||||
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
|
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
|
||||||
import type { N8nInstanceType } from '@/interfaces';
|
|
||||||
import { License } from '@/license';
|
import { License } from '@/license';
|
||||||
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
||||||
import { Logger } from '@/logger';
|
import { Logger } from '@/logger';
|
||||||
|
@ -33,9 +32,7 @@ export abstract class BaseCommand extends Command {
|
||||||
|
|
||||||
protected nodeTypes: NodeTypes;
|
protected nodeTypes: NodeTypes;
|
||||||
|
|
||||||
protected instanceSettings: InstanceSettings;
|
protected instanceSettings: InstanceSettings = Container.get(InstanceSettings);
|
||||||
|
|
||||||
private instanceType: N8nInstanceType = 'main';
|
|
||||||
|
|
||||||
queueModeId: string;
|
queueModeId: string;
|
||||||
|
|
||||||
|
@ -62,9 +59,6 @@ export abstract class BaseCommand extends Command {
|
||||||
process.once('SIGTERM', this.onTerminationSignal('SIGTERM'));
|
process.once('SIGTERM', this.onTerminationSignal('SIGTERM'));
|
||||||
process.once('SIGINT', this.onTerminationSignal('SIGINT'));
|
process.once('SIGINT', this.onTerminationSignal('SIGINT'));
|
||||||
|
|
||||||
// Make sure the settings exist
|
|
||||||
this.instanceSettings = Container.get(InstanceSettings);
|
|
||||||
|
|
||||||
this.nodeTypes = Container.get(NodeTypes);
|
this.nodeTypes = Container.get(NodeTypes);
|
||||||
await Container.get(LoadNodesAndCredentials).init();
|
await Container.get(LoadNodesAndCredentials).init();
|
||||||
|
|
||||||
|
@ -128,17 +122,13 @@ export abstract class BaseCommand extends Command {
|
||||||
await Container.get(TelemetryEventRelay).init();
|
await Container.get(TelemetryEventRelay).init();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected setInstanceType(instanceType: N8nInstanceType) {
|
|
||||||
this.instanceType = instanceType;
|
|
||||||
config.set('generic.instanceType', instanceType);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected setInstanceQueueModeId() {
|
protected setInstanceQueueModeId() {
|
||||||
if (config.get('redis.queueModeId')) {
|
if (config.get('redis.queueModeId')) {
|
||||||
this.queueModeId = config.get('redis.queueModeId');
|
this.queueModeId = config.get('redis.queueModeId');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.queueModeId = generateHostInstanceId(this.instanceType);
|
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
|
||||||
|
this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!);
|
||||||
config.set('redis.queueModeId', this.queueModeId);
|
config.set('redis.queueModeId', this.queueModeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,7 +268,7 @@ export abstract class BaseCommand extends Command {
|
||||||
|
|
||||||
async initLicense(): Promise<void> {
|
async initLicense(): Promise<void> {
|
||||||
this.license = Container.get(License);
|
this.license = Container.get(License);
|
||||||
await this.license.init(this.instanceType ?? 'main');
|
await this.license.init();
|
||||||
|
|
||||||
const activationKey = config.getEnv('license.activationKey');
|
const activationKey = config.getEnv('license.activationKey');
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,6 @@ export class Start extends BaseCommand {
|
||||||
|
|
||||||
constructor(argv: string[], cmdConfig: Config) {
|
constructor(argv: string[], cmdConfig: Config) {
|
||||||
super(argv, cmdConfig);
|
super(argv, cmdConfig);
|
||||||
this.setInstanceType('main');
|
|
||||||
this.setInstanceQueueModeId();
|
this.setInstanceQueueModeId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ export class Webhook extends BaseCommand {
|
||||||
|
|
||||||
constructor(argv: string[], cmdConfig: Config) {
|
constructor(argv: string[], cmdConfig: Config) {
|
||||||
super(argv, cmdConfig);
|
super(argv, cmdConfig);
|
||||||
this.setInstanceType('webhook');
|
|
||||||
if (this.queueModeId) {
|
if (this.queueModeId) {
|
||||||
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
|
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,6 @@ export class Worker extends BaseCommand {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.setInstanceType('worker');
|
|
||||||
this.setInstanceQueueModeId();
|
this.setInstanceQueueModeId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -175,12 +175,6 @@ export const schema = {
|
||||||
env: 'GENERIC_TIMEZONE',
|
env: 'GENERIC_TIMEZONE',
|
||||||
},
|
},
|
||||||
|
|
||||||
instanceType: {
|
|
||||||
doc: 'Type of n8n instance',
|
|
||||||
format: ['main', 'webhook', 'worker'] as const,
|
|
||||||
default: 'main',
|
|
||||||
},
|
|
||||||
|
|
||||||
releaseChannel: {
|
releaseChannel: {
|
||||||
doc: 'N8N release channel',
|
doc: 'N8N release channel',
|
||||||
format: ['stable', 'beta', 'nightly', 'dev'] as const,
|
format: ['stable', 'beta', 'nightly', 'dev'] as const,
|
||||||
|
|
|
@ -1,14 +1,13 @@
|
||||||
|
import type { InstanceType } from 'n8n-core';
|
||||||
import { ALPHABET } from 'n8n-workflow';
|
import { ALPHABET } from 'n8n-workflow';
|
||||||
import { customAlphabet } from 'nanoid';
|
import { customAlphabet } from 'nanoid';
|
||||||
|
|
||||||
import type { N8nInstanceType } from '@/interfaces';
|
|
||||||
|
|
||||||
const nanoid = customAlphabet(ALPHABET, 16);
|
const nanoid = customAlphabet(ALPHABET, 16);
|
||||||
|
|
||||||
export function generateNanoId() {
|
export function generateNanoId() {
|
||||||
return nanoid();
|
return nanoid();
|
||||||
}
|
}
|
||||||
|
|
||||||
export function generateHostInstanceId(instanceType: N8nInstanceType) {
|
export function generateHostInstanceId(instanceType: InstanceType) {
|
||||||
return `${instanceType}-${nanoid()}`;
|
return `${instanceType}-${nanoid()}`;
|
||||||
}
|
}
|
||||||
|
|
|
@ -422,5 +422,3 @@ export abstract class SecretsProvider {
|
||||||
abstract hasSecret(name: string): boolean;
|
abstract hasSecret(name: string): boolean;
|
||||||
abstract getSecretNames(): string[];
|
abstract getSecretNames(): string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
export type N8nInstanceType = 'main' | 'webhook' | 'worker';
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ import {
|
||||||
SETTINGS_LICENSE_CERT_KEY,
|
SETTINGS_LICENSE_CERT_KEY,
|
||||||
UNLIMITED_LICENSE_QUOTA,
|
UNLIMITED_LICENSE_QUOTA,
|
||||||
} from './constants';
|
} from './constants';
|
||||||
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './interfaces';
|
import type { BooleanLicenseFeature, NumericLicenseFeature } from './interfaces';
|
||||||
import type { RedisServicePubSubPublisher } from './services/redis/redis-service-pub-sub-publisher';
|
import type { RedisServicePubSubPublisher } from './services/redis/redis-service-pub-sub-publisher';
|
||||||
import { RedisService } from './services/redis.service';
|
import { RedisService } from './services/redis.service';
|
||||||
|
|
||||||
|
@ -46,8 +46,8 @@ export class License {
|
||||||
/**
|
/**
|
||||||
* Whether this instance should renew the license - on init and periodically.
|
* Whether this instance should renew the license - on init and periodically.
|
||||||
*/
|
*/
|
||||||
private renewalEnabled(instanceType: N8nInstanceType) {
|
private renewalEnabled() {
|
||||||
if (instanceType !== 'main') return false;
|
if (this.instanceSettings.instanceType !== 'main') return false;
|
||||||
|
|
||||||
const autoRenewEnabled = config.getEnv('license.autoRenewEnabled');
|
const autoRenewEnabled = config.getEnv('license.autoRenewEnabled');
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ export class License {
|
||||||
return autoRenewEnabled;
|
return autoRenewEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
async init(instanceType: N8nInstanceType = 'main', forceRecreate = false) {
|
async init(forceRecreate = false) {
|
||||||
if (this.manager && !forceRecreate) {
|
if (this.manager && !forceRecreate) {
|
||||||
this.logger.warn('License manager already initialized or shutting down');
|
this.logger.warn('License manager already initialized or shutting down');
|
||||||
return;
|
return;
|
||||||
|
@ -73,6 +73,7 @@ export class License {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const { instanceType } = this.instanceSettings;
|
||||||
const isMainInstance = instanceType === 'main';
|
const isMainInstance = instanceType === 'main';
|
||||||
const server = config.getEnv('license.serverUrl');
|
const server = config.getEnv('license.serverUrl');
|
||||||
const offlineMode = !isMainInstance;
|
const offlineMode = !isMainInstance;
|
||||||
|
@ -90,7 +91,7 @@ export class License {
|
||||||
? async () => await this.licenseMetricsService.collectPassthroughData()
|
? async () => await this.licenseMetricsService.collectPassthroughData()
|
||||||
: async () => ({});
|
: async () => ({});
|
||||||
|
|
||||||
const renewalEnabled = this.renewalEnabled(instanceType);
|
const renewalEnabled = this.renewalEnabled();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.manager = new LicenseManager({
|
this.manager = new LicenseManager({
|
||||||
|
@ -399,7 +400,7 @@ export class License {
|
||||||
|
|
||||||
async reinit() {
|
async reinit() {
|
||||||
this.manager?.reset();
|
this.manager?.reset();
|
||||||
await this.init('main', true);
|
await this.init(true);
|
||||||
this.logger.debug('License reinitialized');
|
this.logger.debug('License reinitialized');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import { InstanceSettings } from 'n8n-core';
|
||||||
import { ApplicationError } from 'n8n-workflow';
|
import { ApplicationError } from 'n8n-workflow';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
|
|
||||||
import config from '@/config';
|
|
||||||
import type { OrchestrationService } from '@/services/orchestration.service';
|
import type { OrchestrationService } from '@/services/orchestration.service';
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance } from '@test/mocking';
|
||||||
|
|
||||||
|
@ -70,7 +69,8 @@ describe('ScalingService', () => {
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
config.set('generic.instanceType', 'main');
|
// @ts-expect-error readonly property
|
||||||
|
instanceSettings.instanceType = 'main';
|
||||||
instanceSettings.markAsLeader();
|
instanceSettings.markAsLeader();
|
||||||
|
|
||||||
scalingService = new ScalingService(
|
scalingService = new ScalingService(
|
||||||
|
@ -128,8 +128,8 @@ describe('ScalingService', () => {
|
||||||
|
|
||||||
describe('if worker', () => {
|
describe('if worker', () => {
|
||||||
it('should set up queue + listeners', async () => {
|
it('should set up queue + listeners', async () => {
|
||||||
// @ts-expect-error Private field
|
// @ts-expect-error readonly property
|
||||||
scalingService.instanceType = 'worker';
|
instanceSettings.instanceType = 'worker';
|
||||||
|
|
||||||
await scalingService.setupQueue();
|
await scalingService.setupQueue();
|
||||||
|
|
||||||
|
@ -141,8 +141,8 @@ describe('ScalingService', () => {
|
||||||
|
|
||||||
describe('webhook', () => {
|
describe('webhook', () => {
|
||||||
it('should set up a queue + listeners', async () => {
|
it('should set up a queue + listeners', async () => {
|
||||||
// @ts-expect-error Private field
|
// @ts-expect-error readonly property
|
||||||
scalingService.instanceType = 'webhook';
|
instanceSettings.instanceType = 'webhook';
|
||||||
|
|
||||||
await scalingService.setupQueue();
|
await scalingService.setupQueue();
|
||||||
|
|
||||||
|
@ -155,8 +155,8 @@ describe('ScalingService', () => {
|
||||||
|
|
||||||
describe('setupWorker', () => {
|
describe('setupWorker', () => {
|
||||||
it('should set up a worker with concurrency', async () => {
|
it('should set up a worker with concurrency', async () => {
|
||||||
// @ts-expect-error Private field
|
// @ts-expect-error readonly property
|
||||||
scalingService.instanceType = 'worker';
|
instanceSettings.instanceType = 'worker';
|
||||||
await scalingService.setupQueue();
|
await scalingService.setupQueue();
|
||||||
const concurrency = 5;
|
const concurrency = 5;
|
||||||
|
|
||||||
|
@ -172,8 +172,8 @@ describe('ScalingService', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw if called before queue is ready', async () => {
|
it('should throw if called before queue is ready', async () => {
|
||||||
// @ts-expect-error Private field
|
// @ts-expect-error readonly property
|
||||||
scalingService.instanceType = 'worker';
|
instanceSettings.instanceType = 'worker';
|
||||||
|
|
||||||
expect(() => scalingService.setupWorker(5)).toThrow();
|
expect(() => scalingService.setupWorker(5)).toThrow();
|
||||||
});
|
});
|
||||||
|
|
|
@ -31,8 +31,6 @@ import type {
|
||||||
export class ScalingService {
|
export class ScalingService {
|
||||||
private queue: JobQueue;
|
private queue: JobQueue;
|
||||||
|
|
||||||
private readonly instanceType = config.getEnv('generic.instanceType');
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly activeExecutions: ActiveExecutions,
|
private readonly activeExecutions: ActiveExecutions,
|
||||||
|
@ -211,9 +209,10 @@ export class ScalingService {
|
||||||
throw error;
|
throw error;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (this.instanceType === 'main' || this.instanceType === 'webhook') {
|
const { instanceType } = this.instanceSettings;
|
||||||
|
if (instanceType === 'main' || instanceType === 'webhook') {
|
||||||
this.registerMainOrWebhookListeners();
|
this.registerMainOrWebhookListeners();
|
||||||
} else if (this.instanceType === 'worker') {
|
} else if (instanceType === 'worker') {
|
||||||
this.registerWorkerListeners();
|
this.registerWorkerListeners();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -295,7 +294,7 @@ export class ScalingService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private assertWorker() {
|
private assertWorker() {
|
||||||
if (this.instanceType === 'worker') return;
|
if (this.instanceSettings.instanceType === 'worker') return;
|
||||||
|
|
||||||
throw new ApplicationError('This method must be called on a `worker` instance');
|
throw new ApplicationError('This method must be called on a `worker` instance');
|
||||||
}
|
}
|
||||||
|
@ -311,7 +310,7 @@ export class ScalingService {
|
||||||
get isQueueMetricsEnabled() {
|
get isQueueMetricsEnabled() {
|
||||||
return (
|
return (
|
||||||
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
||||||
this.instanceType === 'main' &&
|
this.instanceSettings.instanceType === 'main' &&
|
||||||
!this.orchestrationService.isMultiMainSetupEnabled
|
!this.orchestrationService.isMultiMainSetupEnabled
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,6 @@ let queueModeId: string;
|
||||||
|
|
||||||
function setDefaultConfig() {
|
function setDefaultConfig() {
|
||||||
config.set('executions.mode', 'queue');
|
config.set('executions.mode', 'queue');
|
||||||
config.set('generic.instanceType', 'main');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = {
|
const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = {
|
||||||
|
@ -73,6 +72,9 @@ describe('Orchestration Service', () => {
|
||||||
});
|
});
|
||||||
setDefaultConfig();
|
setDefaultConfig();
|
||||||
queueModeId = config.get('redis.queueModeId');
|
queueModeId = config.get('redis.queueModeId');
|
||||||
|
|
||||||
|
// @ts-expect-error readonly property
|
||||||
|
instanceSettings.instanceType = 'main';
|
||||||
});
|
});
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
|
|
|
@ -14,7 +14,7 @@ import { RedisService } from './redis.service';
|
||||||
export class OrchestrationService {
|
export class OrchestrationService {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly instanceSettings: InstanceSettings,
|
protected readonly instanceSettings: InstanceSettings,
|
||||||
private readonly redisService: RedisService,
|
private readonly redisService: RedisService,
|
||||||
readonly multiMainSetup: MultiMainSetup,
|
readonly multiMainSetup: MultiMainSetup,
|
||||||
) {}
|
) {}
|
||||||
|
@ -31,7 +31,7 @@ export class OrchestrationService {
|
||||||
return (
|
return (
|
||||||
config.getEnv('executions.mode') === 'queue' &&
|
config.getEnv('executions.mode') === 'queue' &&
|
||||||
config.getEnv('multiMainSetup.enabled') &&
|
config.getEnv('multiMainSetup.enabled') &&
|
||||||
config.getEnv('generic.instanceType') === 'main' &&
|
this.instanceSettings.instanceType === 'main' &&
|
||||||
this.isMultiMainSetupLicensed
|
this.isMultiMainSetupLicensed
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import { Container } from 'typedi';
|
import { Container } from 'typedi';
|
||||||
|
|
||||||
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
import { ActiveWorkflowManager } from '@/active-workflow-manager';
|
||||||
|
@ -17,7 +18,7 @@ import { debounceMessageReceiver, messageToRedisServiceCommandObject } from '../
|
||||||
// eslint-disable-next-line complexity
|
// eslint-disable-next-line complexity
|
||||||
export async function handleCommandMessageMain(messageString: string) {
|
export async function handleCommandMessageMain(messageString: string) {
|
||||||
const queueModeId = config.getEnv('redis.queueModeId');
|
const queueModeId = config.getEnv('redis.queueModeId');
|
||||||
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
|
const isMainInstance = Container.get(InstanceSettings).instanceType === 'main';
|
||||||
const message = messageToRedisServiceCommandObject(messageString);
|
const message = messageToRedisServiceCommandObject(messageString);
|
||||||
const logger = Container.get(Logger);
|
const logger = Container.get(Logger);
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
import { Logger } from 'winston';
|
import { Logger } from 'winston';
|
||||||
|
|
||||||
|
@ -11,7 +12,7 @@ import { messageToRedisServiceCommandObject, debounceMessageReceiver } from '../
|
||||||
|
|
||||||
export async function handleCommandMessageWebhook(messageString: string) {
|
export async function handleCommandMessageWebhook(messageString: string) {
|
||||||
const queueModeId = config.getEnv('redis.queueModeId');
|
const queueModeId = config.getEnv('redis.queueModeId');
|
||||||
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
|
const isMainInstance = Container.get(InstanceSettings).instanceType === 'main';
|
||||||
const message = messageToRedisServiceCommandObject(messageString);
|
const message = messageToRedisServiceCommandObject(messageString);
|
||||||
const logger = Container.get(Logger);
|
const logger = Container.get(Logger);
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ export class OrchestrationWebhookService extends OrchestrationService {
|
||||||
return (
|
return (
|
||||||
this.isInitialized &&
|
this.isInitialized &&
|
||||||
config.get('executions.mode') === 'queue' &&
|
config.get('executions.mode') === 'queue' &&
|
||||||
config.get('generic.instanceType') === 'webhook'
|
this.instanceSettings.instanceType === 'webhook'
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ export class OrchestrationWorkerService extends OrchestrationService {
|
||||||
return (
|
return (
|
||||||
this.isInitialized &&
|
this.isInitialized &&
|
||||||
config.get('executions.mode') === 'queue' &&
|
config.get('executions.mode') === 'queue' &&
|
||||||
config.get('generic.instanceType') === 'worker'
|
this.instanceSettings.instanceType === 'worker'
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,19 +48,12 @@ export class PruningService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private isPruningEnabled() {
|
private isPruningEnabled() {
|
||||||
if (
|
const { instanceType, isFollower } = this.instanceSettings;
|
||||||
!config.getEnv('executions.pruneData') ||
|
if (!config.getEnv('executions.pruneData') || inTest || instanceType !== 'main') {
|
||||||
inTest ||
|
|
||||||
config.get('generic.instanceType') !== 'main'
|
|
||||||
) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
if (config.getEnv('multiMainSetup.enabled') && instanceType === 'main' && isFollower) {
|
||||||
config.getEnv('multiMainSetup.enabled') &&
|
|
||||||
config.getEnv('generic.instanceType') === 'main' &&
|
|
||||||
this.instanceSettings.isFollower
|
|
||||||
) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
/* eslint-disable @typescript-eslint/no-shadow */
|
/* eslint-disable @typescript-eslint/no-shadow */
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
import { WorkflowExecute } from 'n8n-core';
|
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
ExecutionError,
|
ExecutionError,
|
||||||
IDeferredPromise,
|
IDeferredPromise,
|
||||||
|
@ -54,6 +54,7 @@ export class WorkflowRunner {
|
||||||
private readonly nodeTypes: NodeTypes,
|
private readonly nodeTypes: NodeTypes,
|
||||||
private readonly permissionChecker: PermissionChecker,
|
private readonly permissionChecker: PermissionChecker,
|
||||||
private readonly eventService: EventService,
|
private readonly eventService: EventService,
|
||||||
|
private readonly instanceSettings: InstanceSettings,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
/** The process did error */
|
/** The process did error */
|
||||||
|
@ -150,7 +151,7 @@ export class WorkflowRunner {
|
||||||
// since these calls are now done by the worker directly
|
// since these calls are now done by the worker directly
|
||||||
if (
|
if (
|
||||||
this.executionsMode !== 'queue' ||
|
this.executionsMode !== 'queue' ||
|
||||||
config.getEnv('generic.instanceType') === 'worker' ||
|
this.instanceSettings.instanceType === 'worker' ||
|
||||||
data.executionMode === 'manual'
|
data.executionMode === 'manual'
|
||||||
) {
|
) {
|
||||||
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
|
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
process.argv[2] = 'worker';
|
||||||
|
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
|
|
||||||
import { Worker } from '@/commands/worker';
|
import { Worker } from '@/commands/worker';
|
||||||
|
@ -27,6 +29,7 @@ const logStreamingEventRelay = mockInstance(LogStreamingEventRelay);
|
||||||
const orchestrationHandlerWorkerService = mockInstance(OrchestrationHandlerWorkerService);
|
const orchestrationHandlerWorkerService = mockInstance(OrchestrationHandlerWorkerService);
|
||||||
const scalingService = mockInstance(ScalingService);
|
const scalingService = mockInstance(ScalingService);
|
||||||
const orchestrationWorkerService = mockInstance(OrchestrationWorkerService);
|
const orchestrationWorkerService = mockInstance(OrchestrationWorkerService);
|
||||||
|
|
||||||
const command = setupTestCommand(Worker);
|
const command = setupTestCommand(Worker);
|
||||||
|
|
||||||
test('worker initializes all its components', async () => {
|
test('worker initializes all its components', async () => {
|
||||||
|
|
|
@ -7,12 +7,12 @@ import {
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import { agent as testAgent } from 'supertest';
|
import { agent as testAgent } from 'supertest';
|
||||||
|
|
||||||
import { AbstractServer } from '@/abstract-server';
|
|
||||||
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
|
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
|
||||||
import { ExternalHooks } from '@/external-hooks';
|
import { ExternalHooks } from '@/external-hooks';
|
||||||
import { NodeTypes } from '@/node-types';
|
import { NodeTypes } from '@/node-types';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { Telemetry } from '@/telemetry';
|
import { Telemetry } from '@/telemetry';
|
||||||
|
import { WebhookServer } from '@/webhooks/webhook-server';
|
||||||
|
|
||||||
import { createUser } from './shared/db/users';
|
import { createUser } from './shared/db/users';
|
||||||
import { createWorkflow } from './shared/db/workflows';
|
import { createWorkflow } from './shared/db/workflows';
|
||||||
|
@ -49,7 +49,7 @@ describe('Webhook API', () => {
|
||||||
|
|
||||||
await initActiveWorkflowManager();
|
await initActiveWorkflowManager();
|
||||||
|
|
||||||
const server = new (class extends AbstractServer {})();
|
const server = new WebhookServer();
|
||||||
await server.start();
|
await server.start();
|
||||||
agent = testAgent(server.app);
|
agent = testAgent(server.app);
|
||||||
});
|
});
|
||||||
|
@ -152,7 +152,7 @@ describe('Webhook API', () => {
|
||||||
|
|
||||||
await initActiveWorkflowManager();
|
await initActiveWorkflowManager();
|
||||||
|
|
||||||
const server = new (class extends AbstractServer {})();
|
const server = new WebhookServer();
|
||||||
await server.start();
|
await server.start();
|
||||||
agent = testAgent(server.app);
|
agent = testAgent(server.app);
|
||||||
});
|
});
|
||||||
|
|
|
@ -4,12 +4,12 @@ import { agent as testAgent } from 'supertest';
|
||||||
import type SuperAgentTest from 'supertest/lib/agent';
|
import type SuperAgentTest from 'supertest/lib/agent';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
|
|
||||||
import { AbstractServer } from '@/abstract-server';
|
|
||||||
import { ExternalHooks } from '@/external-hooks';
|
import { ExternalHooks } from '@/external-hooks';
|
||||||
import { WaitingForms } from '@/waiting-forms';
|
import { WaitingForms } from '@/waiting-forms';
|
||||||
import { LiveWebhooks } from '@/webhooks/live-webhooks';
|
import { LiveWebhooks } from '@/webhooks/live-webhooks';
|
||||||
import { TestWebhooks } from '@/webhooks/test-webhooks';
|
import { TestWebhooks } from '@/webhooks/test-webhooks';
|
||||||
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
||||||
|
import { WebhookServer } from '@/webhooks/webhook-server';
|
||||||
import type { IWebhookResponseCallbackData } from '@/webhooks/webhook.types';
|
import type { IWebhookResponseCallbackData } from '@/webhooks/webhook.types';
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance } from '@test/mocking';
|
||||||
|
|
||||||
|
@ -26,9 +26,9 @@ describe('WebhookServer', () => {
|
||||||
mockInstance(WaitingForms);
|
mockInstance(WaitingForms);
|
||||||
|
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
const server = new (class extends AbstractServer {
|
const server = new WebhookServer();
|
||||||
testWebhooksEnabled = true;
|
// @ts-expect-error: testWebhooksEnabled is private
|
||||||
})();
|
server.testWebhooksEnabled = true;
|
||||||
await server.start();
|
await server.start();
|
||||||
agent = testAgent(server.app);
|
agent = testAgent(server.app);
|
||||||
});
|
});
|
||||||
|
|
|
@ -16,6 +16,8 @@ type Settings = ReadOnlySettings & WritableSettings;
|
||||||
|
|
||||||
type InstanceRole = 'unset' | 'leader' | 'follower';
|
type InstanceRole = 'unset' | 'leader' | 'follower';
|
||||||
|
|
||||||
|
export type InstanceType = 'main' | 'webhook' | 'worker';
|
||||||
|
|
||||||
const inTest = process.env.NODE_ENV === 'test';
|
const inTest = process.env.NODE_ENV === 'test';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
|
@ -40,6 +42,15 @@ export class InstanceSettings {
|
||||||
|
|
||||||
readonly instanceId = this.generateInstanceId();
|
readonly instanceId = this.generateInstanceId();
|
||||||
|
|
||||||
|
readonly instanceType: InstanceType;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
const command = process.argv[2];
|
||||||
|
this.instanceType = ['webhook', 'worker'].includes(command)
|
||||||
|
? (command as InstanceType)
|
||||||
|
: 'main';
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A main is:
|
* A main is:
|
||||||
* - `unset` during bootup,
|
* - `unset` during bootup,
|
||||||
|
|
|
@ -10,7 +10,7 @@ export * from './Constants';
|
||||||
export * from './Credentials';
|
export * from './Credentials';
|
||||||
export * from './DirectoryLoader';
|
export * from './DirectoryLoader';
|
||||||
export * from './Interfaces';
|
export * from './Interfaces';
|
||||||
export { InstanceSettings } from './InstanceSettings';
|
export { InstanceSettings, InstanceType } from './InstanceSettings';
|
||||||
export * from './NodeExecuteFunctions';
|
export * from './NodeExecuteFunctions';
|
||||||
export * from './WorkflowExecute';
|
export * from './WorkflowExecute';
|
||||||
export { NodeExecuteFunctions };
|
export { NodeExecuteFunctions };
|
||||||
|
|
Loading…
Reference in a new issue