mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
feat(core): Set up leader selection for multiple main instances (#7527)
https://linear.app/n8n/issue/PAY-933/set-up-leader-selection-for-multiple-main-instances - [x] Set up new envs - [x] Add config and license checks - [x] Implement `MultiMainInstancePublisher` - [x] Expand `RedisServicePubSubPublisher` to support `MultiMainInstancePublisher` - [x] Init `MultiMainInstancePublisher` on startup and destroy on shutdown - [x] Add to sandbox plans - [x] Test manually Note: This is only for setup - coordinating in reaction to leadership changes will come in later PRs.
This commit is contained in:
parent
3b5e181e66
commit
442c73e63b
|
@ -19,7 +19,7 @@ import {
|
|||
import { License } from '@/License';
|
||||
import { InternalHooks } from '@/InternalHooks';
|
||||
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
|
||||
import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
|
||||
@Service()
|
||||
export class ExternalSecretsManager {
|
||||
|
@ -82,7 +82,7 @@ export class ExternalSecretsManager {
|
|||
}
|
||||
|
||||
async broadcastReloadExternalSecretsProviders() {
|
||||
await Container.get(OrchestrationMainService).broadcastReloadExternalSecretsProviders();
|
||||
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
|
||||
}
|
||||
|
||||
private decryptSecretsSettings(value: string): ExternalSecretsSettings {
|
||||
|
|
|
@ -23,6 +23,14 @@ type FeatureReturnType = Partial<
|
|||
} & { [K in NumericLicenseFeature]: number } & { [K in BooleanLicenseFeature]: boolean }
|
||||
>;
|
||||
|
||||
export class FeatureNotLicensedError extends Error {
|
||||
constructor(feature: (typeof LICENSE_FEATURES)[keyof typeof LICENSE_FEATURES]) {
|
||||
super(
|
||||
`Your license does not allow for ${feature}. To enable ${feature}, please upgrade to a license that supports this feature.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class License {
|
||||
private manager: LicenseManager | undefined;
|
||||
|
@ -204,6 +212,10 @@ export class License {
|
|||
return this.isFeatureEnabled(LICENSE_FEATURES.BINARY_DATA_S3);
|
||||
}
|
||||
|
||||
isMultipleMainInstancesLicensed() {
|
||||
return this.isFeatureEnabled(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
|
||||
}
|
||||
|
||||
isVariablesEnabled() {
|
||||
return this.isFeatureEnabled(LICENSE_FEATURES.VARIABLES);
|
||||
}
|
||||
|
|
|
@ -21,14 +21,14 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
|||
import * as Db from '@/Db';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { Server } from '@/Server';
|
||||
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
|
||||
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR, LICENSE_FEATURES } from '@/constants';
|
||||
import { eventBus } from '@/eventbus';
|
||||
import { BaseCommand } from './BaseCommand';
|
||||
import { InternalHooks } from '@/InternalHooks';
|
||||
import { License } from '@/License';
|
||||
import { License, FeatureNotLicensedError } from '@/License';
|
||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||
import { IConfig } from '@oclif/config';
|
||||
import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
|
||||
|
@ -112,6 +112,14 @@ export class Start extends BaseCommand {
|
|||
|
||||
Container.get(ExecutionRepository).clearTimers();
|
||||
|
||||
if (config.getEnv('leaderSelection.enabled')) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
await Container.get(MultiMainInstancePublisher).destroy();
|
||||
}
|
||||
|
||||
await Container.get(InternalHooks).onN8nStop();
|
||||
|
||||
// Wait for active workflow executions to finish
|
||||
|
@ -215,10 +223,24 @@ export class Start extends BaseCommand {
|
|||
}
|
||||
|
||||
async initOrchestration() {
|
||||
if (config.get('executions.mode') === 'queue') {
|
||||
await Container.get(OrchestrationMainService).init();
|
||||
if (config.get('executions.mode') !== 'queue') return;
|
||||
|
||||
if (!config.get('leaderSelection.enabled')) {
|
||||
await Container.get(SingleMainInstancePublisher).init();
|
||||
await Container.get(OrchestrationHandlerMainService).init();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!Container.get(License).isMultipleMainInstancesLicensed()) {
|
||||
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
|
||||
}
|
||||
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
await Container.get(MultiMainInstancePublisher).init();
|
||||
await Container.get(OrchestrationHandlerMainService).init();
|
||||
}
|
||||
|
||||
async run() {
|
||||
|
|
|
@ -1323,4 +1323,25 @@ export const schema = {
|
|||
env: 'N8N_WORKFLOW_HISTORY_PRUNE_TIME',
|
||||
},
|
||||
},
|
||||
|
||||
leaderSelection: {
|
||||
enabled: {
|
||||
doc: 'Whether to enable leader selection for multiple main instances (license required)',
|
||||
format: Boolean,
|
||||
default: false,
|
||||
env: 'N8N_LEADER_SELECTION_ENABLED',
|
||||
},
|
||||
ttl: {
|
||||
doc: 'Time to live in Redis for leader selection key, in seconds',
|
||||
format: Number,
|
||||
default: 10,
|
||||
env: 'N8N_LEADER_SELECTION_KEY_TTL',
|
||||
},
|
||||
interval: {
|
||||
doc: 'Interval in Redis for leader selection check, in seconds',
|
||||
format: Number,
|
||||
default: 3,
|
||||
env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL',
|
||||
},
|
||||
},
|
||||
};
|
||||
|
|
|
@ -85,6 +85,7 @@ export const LICENSE_FEATURES = {
|
|||
WORKFLOW_HISTORY: 'feat:workflowHistory',
|
||||
DEBUG_IN_EDITOR: 'feat:debugInEditor',
|
||||
BINARY_DATA_S3: 'feat:binaryDataS3',
|
||||
MULTIPLE_MAIN_INSTANCES: 'feat:multipleMainInstances',
|
||||
} as const;
|
||||
|
||||
export const LICENSE_QUOTAS = {
|
||||
|
|
|
@ -67,6 +67,7 @@ export class E2EController {
|
|||
[LICENSE_FEATURES.WORKFLOW_HISTORY]: false,
|
||||
[LICENSE_FEATURES.DEBUG_IN_EDITOR]: false,
|
||||
[LICENSE_FEATURES.BINARY_DATA_S3]: false,
|
||||
[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES]: false,
|
||||
};
|
||||
|
||||
constructor(
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
import { Authorized, Get, RestController } from '@/decorators';
|
||||
import { OrchestrationRequest } from '@/requests';
|
||||
import { Service } from 'typedi';
|
||||
import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
|
||||
@Authorized(['global', 'owner'])
|
||||
@RestController('/orchestration')
|
||||
@Service()
|
||||
export class OrchestrationController {
|
||||
constructor(private readonly orchestrationService: OrchestrationMainService) {}
|
||||
constructor(private readonly orchestrationService: SingleMainInstancePublisher) {}
|
||||
|
||||
/**
|
||||
* These endpoint currently do not return anything, they just trigger the messsage to
|
||||
|
|
|
@ -32,7 +32,7 @@ import { Container, Service } from 'typedi';
|
|||
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
|
||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { Logger } from '@/Logger';
|
||||
|
||||
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
|
||||
|
@ -207,7 +207,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
this.destinations[destination.getId()].startListening();
|
||||
if (notifyWorkers) {
|
||||
await Container.get(
|
||||
OrchestrationMainService,
|
||||
SingleMainInstancePublisher,
|
||||
).broadcastRestartEventbusAfterDestinationUpdate();
|
||||
}
|
||||
return destination;
|
||||
|
@ -235,7 +235,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
}
|
||||
if (notifyWorkers) {
|
||||
await Container.get(
|
||||
OrchestrationMainService,
|
||||
SingleMainInstancePublisher,
|
||||
).broadcastRestartEventbusAfterDestinationUpdate();
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -6,6 +6,8 @@ import config from '@/config';
|
|||
export abstract class OrchestrationService {
|
||||
protected initialized = false;
|
||||
|
||||
protected queueModeId: string;
|
||||
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
|
||||
readonly redisService: RedisService;
|
||||
|
@ -28,6 +30,7 @@ export abstract class OrchestrationService {
|
|||
|
||||
constructor() {
|
||||
this.redisService = Container.get(RedisService);
|
||||
this.queueModeId = config.getEnv('redis.queueModeId');
|
||||
}
|
||||
|
||||
sanityCheck(): boolean {
|
||||
|
@ -44,7 +47,7 @@ export abstract class OrchestrationService {
|
|||
this.initialized = false;
|
||||
}
|
||||
|
||||
private async initPublisher() {
|
||||
protected async initPublisher() {
|
||||
this.redisPublisher = await this.redisService.getPubSubPublisher();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
import config from '@/config';
|
||||
import { Service } from 'typedi';
|
||||
import { TIME } from '@/constants';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
|
||||
|
||||
/**
|
||||
* For use in main instance, in multiple main instances cluster.
|
||||
*/
|
||||
@Service()
|
||||
export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||
private id = this.queueModeId;
|
||||
|
||||
private leaderId: string | undefined;
|
||||
|
||||
private get isLeader() {
|
||||
return this.id === this.leaderId;
|
||||
}
|
||||
|
||||
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
||||
|
||||
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
|
||||
|
||||
private leaderCheckInterval: NodeJS.Timer | undefined;
|
||||
|
||||
async init() {
|
||||
await this.initPublisher();
|
||||
|
||||
this.initialized = true;
|
||||
|
||||
await this.tryBecomeLeader();
|
||||
|
||||
this.leaderCheckInterval = setInterval(
|
||||
async () => {
|
||||
await this.checkLeader();
|
||||
},
|
||||
config.getEnv('leaderSelection.interval') * TIME.SECOND,
|
||||
);
|
||||
}
|
||||
|
||||
async destroy() {
|
||||
clearInterval(this.leaderCheckInterval);
|
||||
|
||||
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
|
||||
}
|
||||
|
||||
private async checkLeader() {
|
||||
if (!this.redisPublisher.redisClient) return;
|
||||
|
||||
const leaderId = await this.redisPublisher.get(this.leaderKey);
|
||||
|
||||
if (!leaderId) {
|
||||
this.logger.debug('Leadership vacant, attempting to become leader...');
|
||||
await this.tryBecomeLeader();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isLeader) {
|
||||
this.logger.debug(`Leader is this instance "${this.id}"`);
|
||||
|
||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||
} else {
|
||||
this.logger.debug(`Leader is other instance "${leaderId}"`);
|
||||
|
||||
this.leaderId = leaderId;
|
||||
}
|
||||
}
|
||||
|
||||
private async tryBecomeLeader() {
|
||||
if (this.isLeader || !this.redisPublisher.redisClient) return;
|
||||
|
||||
// this can only succeed if leadership is currently vacant
|
||||
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
|
||||
|
||||
if (keySetSuccessfully) {
|
||||
this.logger.debug(`Leader is now this instance "${this.id}"`);
|
||||
|
||||
this.leaderId = this.id;
|
||||
|
||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
import { Logger } from '@/Logger';
|
||||
import { Service } from 'typedi';
|
||||
import { OrchestrationService } from '@/services/orchestration.base.service';
|
||||
|
||||
/**
|
||||
* For use in main instance, in single main instance scenario.
|
||||
*/
|
||||
@Service()
|
||||
export class SingleMainInstancePublisher extends OrchestrationService {
|
||||
constructor(protected readonly logger: Logger) {
|
||||
super();
|
||||
}
|
||||
|
||||
sanityCheck() {
|
||||
return this.initialized && this.isQueueMode && this.isMainInstance;
|
||||
}
|
||||
|
||||
async getWorkerStatus(id?: string) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'getStatus';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command,
|
||||
targets: id ? [id] : undefined,
|
||||
});
|
||||
}
|
||||
|
||||
async getWorkerIds() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'getId';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
|
||||
async broadcastRestartEventbusAfterDestinationUpdate() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'restartEventBus';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
|
||||
async broadcastReloadExternalSecretsProviders() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'reloadExternalSecretsProviders';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
}
|
|
@ -34,7 +34,8 @@ export async function handleCommandMessageMain(messageString: string) {
|
|||
};
|
||||
return message;
|
||||
}
|
||||
if (isMainInstance) {
|
||||
|
||||
if (isMainInstance && !config.getEnv('leaderSelection.enabled')) {
|
||||
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
|
||||
logger.error(
|
||||
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
import { Service } from 'typedi';
|
||||
import { OrchestrationService } from '../../orchestration.base.service';
|
||||
|
||||
@Service()
|
||||
export class OrchestrationMainService extends OrchestrationService {
|
||||
sanityCheck(): boolean {
|
||||
return this.initialized && this.isQueueMode && this.isMainInstance;
|
||||
}
|
||||
|
||||
async getWorkerStatus(id?: string) {
|
||||
if (!this.sanityCheck()) return;
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'getStatus',
|
||||
targets: id ? [id] : undefined,
|
||||
});
|
||||
}
|
||||
|
||||
async getWorkerIds() {
|
||||
if (!this.sanityCheck()) return;
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'getId',
|
||||
});
|
||||
}
|
||||
|
||||
async broadcastRestartEventbusAfterDestinationUpdate() {
|
||||
if (!this.sanityCheck()) return;
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'restartEventBus',
|
||||
});
|
||||
}
|
||||
|
||||
async broadcastReloadExternalSecretsProviders() {
|
||||
if (!this.sanityCheck()) return;
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'reloadExternalSecretsProviders',
|
||||
});
|
||||
}
|
||||
}
|
|
@ -39,4 +39,30 @@ export class RedisServicePubSubPublisher extends RedisServiceBaseSender {
|
|||
async publishToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise<void> {
|
||||
await this.publish(WORKER_RESPONSE_REDIS_CHANNEL, JSON.stringify(message));
|
||||
}
|
||||
|
||||
async setIfNotExists(key: string, value: string) {
|
||||
if (!this.redisClient) await this.init();
|
||||
|
||||
const success = await this.redisClient?.setnx(key, value);
|
||||
|
||||
return !!success;
|
||||
}
|
||||
|
||||
async setExpiration(key: string, ttl: number) {
|
||||
if (!this.redisClient) await this.init();
|
||||
|
||||
await this.redisClient?.expire(key, ttl);
|
||||
}
|
||||
|
||||
async get(key: string) {
|
||||
if (!this.redisClient) await this.init();
|
||||
|
||||
return this.redisClient?.get(key);
|
||||
}
|
||||
|
||||
async clear(key: string) {
|
||||
if (!this.redisClient) await this.init();
|
||||
|
||||
await this.redisClient?.del(key);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import Container from 'typedi';
|
||||
import config from '@/config';
|
||||
import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
|
||||
import { eventBus } from '@/eventbus';
|
||||
import { RedisService } from '@/services/redis.service';
|
||||
|
@ -12,7 +12,7 @@ import * as helpers from '@/services/orchestration/helpers';
|
|||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||
import { Logger } from '@/Logger';
|
||||
|
||||
const os = Container.get(OrchestrationMainService);
|
||||
const os = Container.get(SingleMainInstancePublisher);
|
||||
const handler = Container.get(OrchestrationHandlerMainService);
|
||||
|
||||
let queueModeId: string;
|
||||
|
|
Loading…
Reference in a new issue