feat(core): Add secrets provider reload and refactor (#7277)

This PR adds a message for queue mode which triggers an external secrets
provider reload inside the workers if the configuration has changed on
the main instance.

It also refactors some of the message handler code to remove cyclic
dependencies, as well as remove unnecessary duplicate redis clients
inside services (thanks to no more cyclic deps)
This commit is contained in:
Michael Auerswald 2023-09-28 12:57:35 +02:00 committed by GitHub
parent a80abad3af
commit 53a7502d20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 190 additions and 86 deletions

View file

@ -20,6 +20,7 @@ import { WaitingWebhooks } from '@/WaitingWebhooks';
import { webhookRequestHandler } from '@/WebhookHelpers';
import { generateHostInstanceId } from './databases/utils/generators';
import { OrchestrationService } from './services/orchestration.service';
import { OrchestrationHandlerService } from './services/orchestration.handler.service';
export abstract class AbstractServer {
protected server: Server;
@ -118,6 +119,7 @@ export abstract class AbstractServer {
if (config.getEnv('executions.mode') === 'queue') {
// will start the redis connections
await Container.get(OrchestrationService).init();
await Container.get(OrchestrationHandlerService).init();
}
}

View file

@ -20,6 +20,7 @@ import {
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { OrchestrationService } from '@/services/orchestration.service';
const logger = getLogger();
@ -70,6 +71,21 @@ export class ExternalSecretsManager {
Object.values(this.initRetryTimeouts).forEach((v) => clearTimeout(v));
}
async reloadAllProviders(backoff?: number) {
logger.debug('Reloading all external secrets providers');
const providers = this.getProviderNames();
if (!providers) {
return;
}
for (const provider of providers) {
await this.reloadProvider(provider, backoff);
}
}
async broadcastReloadExternalSecretsProviders() {
await Container.get(OrchestrationService).broadcastReloadExternalSecretsProviders();
}
private async getEncryptionKey(): Promise<string> {
return UserSettings.getEncryptionKey();
}
@ -274,6 +290,7 @@ export class ExternalSecretsManager {
await this.saveAndSetSettings(settings, this.settingsRepo);
this.cachedSettings = settings;
await this.reloadProvider(provider);
await this.broadcastReloadExternalSecretsProviders();
void this.trackProviderSave(provider, isNewProvider, userId);
}
@ -293,6 +310,7 @@ export class ExternalSecretsManager {
this.cachedSettings = settings;
await this.reloadProvider(provider);
await this.updateSecrets();
await this.broadcastReloadExternalSecretsProviders();
}
private async trackProviderSave(vaultType: string, isNew: boolean, userId?: string) {
@ -373,6 +391,7 @@ export class ExternalSecretsManager {
}
try {
await this.providers[provider].update();
await this.broadcastReloadExternalSecretsProviders();
return true;
} catch {
return false;

View file

@ -94,7 +94,6 @@ export abstract class BaseCommand extends Command {
}
protected setInstanceQueueModeId() {
if (config.getEnv('executions.mode') === 'queue') {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
@ -102,7 +101,6 @@ export abstract class BaseCommand extends Command {
this.queueModeId = generateHostInstanceId(this.instanceType);
config.set('redis.queueModeId', this.queueModeId);
}
}
protected async stopProcess() {
// This needs to be overridden

View file

@ -29,12 +29,9 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container, { Service } from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { RedisService } from '@/services/redis.service';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import { EVENT_BUS_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { OrchestrationService } from '../../services/orchestration.service';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -54,10 +51,6 @@ export class MessageEventBus extends EventEmitter {
isInitialized: boolean;
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber;
logWriter: MessageEventBusLogWriter;
destinations: {
@ -91,20 +84,6 @@ export class MessageEventBus extends EventEmitter {
return;
}
if (config.getEnv('executions.mode') === 'queue') {
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
await this.redisSubscriber.subscribeToEventLog();
this.redisSubscriber.addMessageHandler(
'MessageEventBusMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleRedisEventBusMessage(messageString);
}
},
);
}
LoggerProxy.debug('Initializing event bus...');
const savedEventDestinations = await Db.collections.EventDestinations.find({});
@ -211,7 +190,7 @@ export class MessageEventBus extends EventEmitter {
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate();
}
return destination;
}
@ -237,7 +216,7 @@ export class MessageEventBus extends EventEmitter {
delete this.destinations[id];
}
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate();
}
return result;
}
@ -253,14 +232,6 @@ export class MessageEventBus extends EventEmitter {
return eventData;
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await this.redisPublisher.publishToCommandChannel({
command: 'restartEventBus',
});
}
}
private async trySendingUnsent(msgs?: EventMessageTypes[]) {
const unsentMessages = msgs ?? (await this.getEventsUnsent());
if (unsentMessages.length > 0) {
@ -281,7 +252,6 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
await this.redisSubscriber?.unSubscribeFromEventLog();
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');
}

View file

@ -0,0 +1,47 @@
import Container, { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './redis/RedisServiceHelper';
import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from './orchestration/handleCommandMessage';
import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus';
@Service()
export class OrchestrationHandlerService {
redisSubscriber: RedisServicePubSubSubscriber;
constructor(readonly redisService: RedisService) {}
async init() {
await this.initSubscriber();
}
async shutdown() {
await this.redisSubscriber?.destroy();
}
private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel();
await this.redisSubscriber.subscribeToEventLog();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessage(messageString);
} else if (channel === EVENT_BUS_REDIS_CHANNEL) {
await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString);
}
},
);
}
}

View file

@ -1,10 +1,7 @@
import { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper';
import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from './orchestration/handleCommandMessage';
import config from '@/config';
@Service()
export class OrchestrationService {
@ -12,44 +9,29 @@ export class OrchestrationService {
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber;
get isQueueMode() {
return config.getEnv('executions.mode') === 'queue';
}
constructor(readonly redisService: RedisService) {}
async init() {
await this.initPublisher();
await this.initSubscriber();
this.initialized = true;
}
async shutdown() {
await this.redisPublisher?.destroy();
await this.redisSubscriber?.destroy();
}
private async initPublisher() {
this.redisPublisher = await this.redisService.getPubSubPublisher();
}
private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessage(messageString);
}
},
);
}
async getWorkerStatus(id?: string) {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
@ -60,6 +42,9 @@ export class OrchestrationService {
}
async getWorkerIds() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
@ -67,4 +52,28 @@ export class OrchestrationService {
command: 'getId',
});
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'restartEventBus',
});
}
async broadcastReloadExternalSecretsProviders() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadExternalSecretsProviders',
});
}
}

View file

@ -1,14 +1,23 @@
import { LoggerProxy } from 'n8n-workflow';
import { messageToRedisServiceCommandObject } from './helpers';
import config from '@/config';
import { MessageEventBus } from '../../eventbus/MessageEventBus/MessageEventBus';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import Container from 'typedi';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import type { N8nInstanceType } from '@/Interfaces';
import { License } from '@/License';
// this function handles commands sent to the MAIN instance. the workers handle their own commands
export async function handleCommandMessage(messageString: string) {
const queueModeId = config.get('redis.queueModeId');
const instanceType = config.get('generic.instanceType') as N8nInstanceType;
const isMainInstance = instanceType === 'main';
const message = messageToRedisServiceCommandObject(messageString);
if (message) {
LoggerProxy.debug(
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
);
if (
message.senderId === queueModeId ||
(message.targets && !message.targets.includes(queueModeId))
@ -21,16 +30,19 @@ export async function handleCommandMessage(messageString: string) {
}
switch (message.command) {
case 'reloadLicense':
// at this point in time, only a single main instance is supported, thus this
// command _should_ never be caught currently (which is why we log a warning)
LoggerProxy.warn(
if (isMainInstance) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
LoggerProxy.error(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
);
// once multiple main instances are supported, this command should be handled
// await Container.get(License).reload();
return message;
}
await Container.get(License).reload();
break;
case 'restartEventBus':
await Container.get(MessageEventBus).restart();
case 'reloadExternalSecretsProviders':
await Container.get(ExternalSecretsManager).reloadAllProviders();
default:
break;
}

View file

@ -3,7 +3,8 @@ export type RedisServiceCommand =
| 'getId'
| 'restartEventBus'
| 'stopWorker'
| 'reloadLicense';
| 'reloadLicense'
| 'reloadExternalSecretsProviders';
/**
* An object to be sent via Redis pub/sub from the main process to the workers.
@ -49,6 +50,13 @@ export type RedisServiceWorkerResponseObject = {
error?: string;
};
}
| {
command: 'reloadExternalSecretsProviders';
payload: {
result: 'success' | 'error';
error?: string;
};
}
| {
command: 'stopWorker';
}

View file

@ -6,6 +6,7 @@ import * as os from 'os';
import Container from 'typedi';
import { License } from '@/License';
import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '../ExternalSecrets/ExternalSecretsManager.ee';
export function getWorkerCommandReceivedHandler(options: {
queueModeId: string;
@ -26,6 +27,9 @@ export function getWorkerCommandReceivedHandler(options: {
return;
}
if (message) {
LoggerProxy.debug(
`RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`,
);
if (message.targets && !message.targets.includes(options.queueModeId)) {
return; // early return if the message is not for this worker
}
@ -59,6 +63,7 @@ export function getWorkerCommandReceivedHandler(options: {
});
break;
case 'restartEventBus':
try {
await Container.get(MessageEventBus).restart();
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
@ -67,6 +72,37 @@ export function getWorkerCommandReceivedHandler(options: {
result: 'success',
},
});
} catch (error) {
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
payload: {
result: 'error',
error: (error as Error).message,
},
});
}
break;
case 'reloadExternalSecretsProviders':
try {
await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
payload: {
result: 'success',
},
});
} catch (error) {
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
payload: {
result: 'error',
error: (error as Error).message,
},
});
}
break;
case 'reloadLicense':
await Container.get(License).reload();

View file

@ -9,8 +9,10 @@ import { RedisService } from '@/services/redis.service';
import { mockInstance } from '../../integration/shared/utils';
import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage';
import { OrchestrationHandlerService } from '../../../src/services/orchestration.handler.service';
const os = Container.get(OrchestrationService);
const handler = Container.get(OrchestrationHandlerService);
let queueModeId: string;
@ -76,8 +78,9 @@ describe('Orchestration Service', () => {
test('should initialize', async () => {
await os.init();
await handler.init();
expect(os.redisPublisher).toBeDefined();
expect(os.redisSubscriber).toBeDefined();
expect(handler.redisSubscriber).toBeDefined();
expect(queueModeId).toBeDefined();
});
@ -89,7 +92,7 @@ describe('Orchestration Service', () => {
});
test('should handle command messages from others', async () => {
jest.spyOn(LoggerProxy, 'warn');
jest.spyOn(LoggerProxy, 'error');
const responseFalseId = await handleCommandMessage(
JSON.stringify({
senderId: 'test',
@ -99,8 +102,8 @@ describe('Orchestration Service', () => {
expect(responseFalseId).toBeDefined();
expect(responseFalseId!.command).toEqual('reloadLicense');
expect(responseFalseId!.senderId).toEqual('test');
expect(LoggerProxy.warn).toHaveBeenCalled();
jest.spyOn(LoggerProxy, 'warn').mockRestore();
expect(LoggerProxy.error).toHaveBeenCalled();
jest.spyOn(LoggerProxy, 'error').mockRestore();
});
test('should reject command messages from iteslf', async () => {