refactor(core): Make orchestration service smaller (#11275)

This commit is contained in:
Iván Ovejero 2024-10-16 17:34:32 +02:00 committed by GitHub
parent bf28fbefe5
commit d37acdb873
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 158 additions and 157 deletions

View file

@ -13,7 +13,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
const orchestrationService = new OrchestrationService(mock(), multiMainSetup);
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const execution = mock<IExecutionResponse>({

View file

@ -48,6 +48,7 @@ import { WorkflowExecutionService } from '@/workflows/workflow-execution.service
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { ExecutionService } from './executions/execution.service';
import { Publisher } from './scaling/pubsub/publisher.service';
interface QueuedActivation {
activationMode: WorkflowActivateMode;
@ -75,6 +76,7 @@ export class ActiveWorkflowManager {
private readonly activeWorkflowsService: ActiveWorkflowsService,
private readonly workflowExecutionService: WorkflowExecutionService,
private readonly instanceSettings: InstanceSettings,
private readonly publisher: Publisher,
) {}
async init() {
@ -517,8 +519,9 @@ export class ActiveWorkflowManager {
{ shouldPublish } = { shouldPublish: true },
) {
if (this.orchestrationService.isMultiMainSetupEnabled && shouldPublish) {
await this.orchestrationService.publish('add-webhooks-triggers-and-pollers', {
workflowId,
void this.publisher.publishCommand({
command: 'add-webhooks-triggers-and-pollers',
payload: { workflowId },
});
return;
@ -526,8 +529,8 @@ export class ActiveWorkflowManager {
let workflow: Workflow;
const shouldAddWebhooks = this.orchestrationService.shouldAddWebhooks(activationMode);
const shouldAddTriggersAndPollers = this.orchestrationService.shouldAddTriggersAndPollers();
const shouldAddWebhooks = this.shouldAddWebhooks(activationMode);
const shouldAddTriggersAndPollers = this.shouldAddTriggersAndPollers();
const shouldDisplayActivationMessage =
(shouldAddWebhooks || shouldAddTriggersAndPollers) &&
@ -717,7 +720,10 @@ export class ActiveWorkflowManager {
);
}
await this.orchestrationService.publish('remove-triggers-and-pollers', { workflowId });
void this.publisher.publishCommand({
command: 'remove-triggers-and-pollers',
payload: { workflowId },
});
return;
}
@ -810,4 +816,29 @@ export class ActiveWorkflowManager {
async removeActivationError(workflowId: string) {
await this.activationErrorsService.deregister(workflowId);
}
/**
* Whether this instance may add webhooks to the `webhook_entity` table.
*/
shouldAddWebhooks(activationMode: WorkflowActivateMode) {
// Always try to populate the webhook entity table as well as register the webhooks
// to prevent issues with users upgrading from a version < 1.15, where the webhook entity
// was cleared on shutdown to anything past 1.28.0, where we stopped populating it on init,
// causing all webhooks to break
if (activationMode === 'init') return true;
if (activationMode === 'leadershipChange') return false;
return this.instanceSettings.isLeader; // 'update' or 'activate'
}
/**
* Whether this instance may add triggers and pollers to memory.
*
* In both single- and multi-main setup, only the leader is allowed to manage
* triggers and pollers in memory, to ensure they are not duplicated.
*/
shouldAddTriggersAndPollers() {
return this.instanceSettings.isLeader;
}
}

View file

@ -1,31 +1,23 @@
import { Post, RestController, GlobalScope } from '@/decorators';
import { License } from '@/license';
import { OrchestrationRequest } from '@/requests';
import { OrchestrationService } from '@/services/orchestration.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
@RestController('/orchestration')
export class OrchestrationController {
constructor(
private readonly orchestrationService: OrchestrationService,
private readonly licenseService: License,
private readonly publisher: Publisher,
) {}
/**
* These endpoints do not return anything, they just trigger the message to
* This endpoint does not return anything, it just triggers the message to
* the workers to respond on Redis with their status.
*/
@GlobalScope('orchestration:read')
@Post('/worker/status/:id')
async getWorkersStatus(req: OrchestrationRequest.Get) {
if (!this.licenseService.isWorkerViewLicensed()) return;
const id = req.params.id;
return await this.orchestrationService.getWorkerStatus(id);
}
@GlobalScope('orchestration:read')
@Post('/worker/status')
async getWorkersStatusAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerStatus();
return await this.publisher.publishCommand({ command: 'get-worker-status' });
}
}

View file

@ -14,7 +14,7 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { License } from '@/license';
import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { ExecutionRecoveryService } from '../../executions/execution-recovery.service';
import type { EventMessageTypes } from '../event-message-classes/';
@ -70,7 +70,7 @@ export class MessageEventBus extends EventEmitter {
private readonly executionRepository: ExecutionRepository,
private readonly eventDestinationsRepository: EventDestinationsRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly orchestrationService: OrchestrationService,
private readonly publisher: Publisher,
private readonly recoveryService: ExecutionRecoveryService,
private readonly license: License,
private readonly globalConfig: GlobalConfig,
@ -210,7 +210,7 @@ export class MessageEventBus extends EventEmitter {
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await this.orchestrationService.publish('restart-event-bus');
void this.publisher.publishCommand({ command: 'restart-event-bus' });
}
return destination;
}
@ -236,7 +236,7 @@ export class MessageEventBus extends EventEmitter {
delete this.destinations[id];
}
if (notifyWorkers) {
await this.orchestrationService.publish('restart-event-bus');
void this.publisher.publishCommand({ command: 'restart-event-bus' });
}
return result;
}

View file

@ -55,6 +55,7 @@ describe('External Secrets Manager', () => {
providersMock,
cipher,
mock(),
mock(),
);
});

View file

@ -1,6 +1,6 @@
import { Cipher } from 'n8n-core';
import { jsonParse, type IDataObject, ApplicationError } from 'n8n-workflow';
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { SettingsRepository } from '@/databases/repositories/settings.repository';
import { EventService } from '@/events/event.service';
@ -11,7 +11,7 @@ import type {
} from '@/interfaces';
import { License } from '@/license';
import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { EXTERNAL_SECRETS_INITIAL_BACKOFF, EXTERNAL_SECRETS_MAX_BACKOFF } from './constants';
import { updateIntervalTime } from './external-secrets-helper.ee';
@ -38,6 +38,7 @@ export class ExternalSecretsManager {
private readonly secretsProviders: ExternalSecretsProviders,
private readonly cipher: Cipher,
private readonly eventService: EventService,
private readonly publisher: Publisher,
) {}
async init(): Promise<void> {
@ -78,8 +79,8 @@ export class ExternalSecretsManager {
}
}
async broadcastReloadExternalSecretsProviders() {
await Container.get(OrchestrationService).publish('reload-external-secrets-providers');
broadcastReloadExternalSecretsProviders() {
void this.publisher.publishCommand({ command: 'reload-external-secrets-providers' });
}
private decryptSecretsSettings(value: string): ExternalSecretsSettings {
@ -280,7 +281,7 @@ export class ExternalSecretsManager {
await this.saveAndSetSettings(settings, this.settingsRepo);
this.cachedSettings = settings;
await this.reloadProvider(provider);
await this.broadcastReloadExternalSecretsProviders();
this.broadcastReloadExternalSecretsProviders();
void this.trackProviderSave(provider, isNewProvider, userId);
}
@ -300,7 +301,7 @@ export class ExternalSecretsManager {
this.cachedSettings = settings;
await this.reloadProvider(provider);
await this.updateSecrets();
await this.broadcastReloadExternalSecretsProviders();
this.broadcastReloadExternalSecretsProviders();
}
private async trackProviderSave(vaultType: string, isNew: boolean, userId?: string) {
@ -380,7 +381,7 @@ export class ExternalSecretsManager {
}
try {
await this.providers[provider].update();
await this.broadcastReloadExternalSecretsProviders();
this.broadcastReloadExternalSecretsProviders();
return true;
} catch {
return false;

View file

@ -20,7 +20,7 @@ describe('Push', () => {
test('should validate pushRef on requests for websocket backend', () => {
config.set('push.backend', 'websocket');
const push = new Push(mock());
const push = new Push(mock(), mock());
const ws = mock<WebSocket>();
const request = mock<WebSocketPushRequest>({ user, ws });
request.query = { pushRef: '' };
@ -33,7 +33,7 @@ describe('Push', () => {
test('should validate pushRef on requests for SSE backend', () => {
config.set('push.backend', 'sse');
const push = new Push(mock());
const push = new Push(mock(), mock());
const request = mock<SSEPushRequest>({ user, ws: undefined });
request.query = { pushRef: '' };
expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError);

View file

@ -12,6 +12,7 @@ import config from '@/config';
import type { User } from '@/databases/entities/user';
import { OnShutdown } from '@/decorators/on-shutdown';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { TypedEmitter } from '@/typed-emitter';
@ -39,7 +40,10 @@ export class Push extends TypedEmitter<PushEvents> {
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
constructor(private readonly orchestrationService: OrchestrationService) {
constructor(
private readonly orchestrationService: OrchestrationService,
private readonly publisher: Publisher,
) {
super();
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
@ -89,8 +93,10 @@ export class Push extends TypedEmitter<PushEvents> {
* relay the former's execution lifecycle events to the creator's frontend.
*/
if (this.orchestrationService.isMultiMainSetupEnabled && !this.backend.hasPushRef(pushRef)) {
const payload = { type, args: data, pushRef };
void this.orchestrationService.publish('relay-execution-lifecycle-event', payload);
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { type, args: data, pushRef },
});
return;
}

View file

@ -478,15 +478,6 @@ export declare namespace ExternalSecretsRequest {
type UpdateProvider = AuthenticatedRequest<{ provider: string }>;
}
// ----------------------------------
// /orchestration
// ----------------------------------
//
export declare namespace OrchestrationRequest {
type GetAll = AuthenticatedRequest;
type Get = AuthenticatedRequest<{ id: string }, {}, {}, {}>;
}
// ----------------------------------
// /workflow-history
// ----------------------------------

View file

@ -1,7 +1,6 @@
import type Redis from 'ioredis';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import type { WorkflowActivateMode } from 'n8n-workflow';
import Container from 'typedi';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
@ -45,35 +44,4 @@ describe('Orchestration Service', () => {
// @ts-expect-error Private field
expect(os.publisher).toBeDefined();
});
describe('shouldAddWebhooks', () => {
test('should return true for init', () => {
// We want to ensure that webhooks are populated on init
// more https://github.com/n8n-io/n8n/pull/8830
const result = os.shouldAddWebhooks('init');
expect(result).toBe(true);
});
test('should return false for leadershipChange', () => {
const result = os.shouldAddWebhooks('leadershipChange');
expect(result).toBe(false);
});
test('should return true for update or activate when is leader', () => {
const modes = ['update', 'activate'] as WorkflowActivateMode[];
for (const mode of modes) {
const result = os.shouldAddWebhooks(mode);
expect(result).toBe(true);
}
});
test('should return false for update or activate when not leader', () => {
instanceSettings.markAsFollower();
const modes = ['update', 'activate'] as WorkflowActivateMode[];
for (const mode of modes) {
const result = os.shouldAddWebhooks(mode);
expect(result).toBe(false);
}
});
});
});

View file

@ -23,10 +23,9 @@ import type { CommunityPackages } from '@/interfaces';
import { License } from '@/license';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { Logger } from '@/logging/logger.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { toError } from '@/utils';
import { OrchestrationService } from './orchestration.service';
const DEFAULT_REGISTRY = 'https://registry.npmjs.org';
const {
@ -60,7 +59,7 @@ export class CommunityPackagesService {
private readonly logger: Logger,
private readonly installedPackageRepository: InstalledPackagesRepository,
private readonly loadNodesAndCredentials: LoadNodesAndCredentials,
private readonly orchestrationService: OrchestrationService,
private readonly publisher: Publisher,
private readonly license: License,
private readonly globalConfig: GlobalConfig,
) {}
@ -322,7 +321,10 @@ export class CommunityPackagesService {
async removePackage(packageName: string, installedPackage: InstalledPackages): Promise<void> {
await this.removeNpmPackage(packageName);
await this.removePackageFromDatabase(installedPackage);
await this.orchestrationService.publish('community-package-uninstall', { packageName });
void this.publisher.publishCommand({
command: 'community-package-uninstall',
payload: { packageName },
});
}
private getNpmRegistry() {
@ -368,10 +370,10 @@ export class CommunityPackagesService {
await this.removePackageFromDatabase(options.installedPackage);
}
const installedPackage = await this.persistInstalledPackage(loader);
await this.orchestrationService.publish(
isUpdate ? 'community-package-update' : 'community-package-install',
{ packageName, packageVersion },
);
void this.publisher.publishCommand({
command: isUpdate ? 'community-package-update' : 'community-package-install',
payload: { packageName, packageVersion },
});
await this.loadNodesAndCredentials.postProcessLoaders();
this.logger.info(`Community package installed: ${packageName}`);
return installedPackage;

View file

@ -1,10 +1,7 @@
import { InstanceSettings } from 'n8n-core';
import type { WorkflowActivateMode } from 'n8n-workflow';
import Container, { Service } from 'typedi';
import config from '@/config';
import type { PubSubCommandMap } from '@/events/maps/pub-sub.event-map';
import { Logger } from '@/logging/logger.service';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { Subscriber } from '@/scaling/pubsub/subscriber.service';
@ -13,7 +10,6 @@ import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee';
@Service()
export class OrchestrationService {
constructor(
private readonly logger: Logger,
readonly instanceSettings: InstanceSettings,
readonly multiMainSetup: MultiMainSetup,
) {}
@ -78,68 +74,4 @@ export class OrchestrationService {
this.isInitialized = false;
}
// ----------------------------------
// pubsub
// ----------------------------------
async publish<CommandKey extends keyof PubSubCommandMap>(
commandKey: CommandKey,
payload?: PubSubCommandMap[CommandKey],
) {
if (!this.sanityCheck()) return;
this.logger.debug(
`[Instance ID ${this.instanceSettings.hostId}] Publishing command "${commandKey}"`,
payload,
);
await this.publisher.publishCommand({ command: commandKey, payload });
}
// ----------------------------------
// workers status
// ----------------------------------
async getWorkerStatus(id?: string) {
if (!this.sanityCheck()) return;
const command = 'get-worker-status';
this.logger.debug(`Sending "${command}" to command channel`);
await this.publisher.publishCommand({
command,
targets: id ? [id] : undefined,
});
}
// ----------------------------------
// activations
// ----------------------------------
/**
* Whether this instance may add webhooks to the `webhook_entity` table.
*/
shouldAddWebhooks(activationMode: WorkflowActivateMode) {
// Always try to populate the webhook entity table as well as register the webhooks
// to prevent issues with users upgrading from a version < 1.15, where the webhook entity
// was cleared on shutdown to anything past 1.28.0, where we stopped populating it on init,
// causing all webhooks to break
if (activationMode === 'init') return true;
if (activationMode === 'leadershipChange') return false;
return this.instanceSettings.isLeader; // 'update' or 'activate'
}
/**
* Whether this instance may add triggers and pollers to memory.
*
* In both single- and multi-main setup, only the leader is allowed to manage
* triggers and pollers in memory, to ensure they are not duplicated.
*/
shouldAddTriggersAndPollers() {
return this.instanceSettings.isLeader;
}
}

View file

@ -39,7 +39,7 @@ let testWebhooks: TestWebhooks;
describe('TestWebhooks', () => {
beforeAll(() => {
testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock());
testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock(), mock());
jest.useFakeTimers();
});

View file

@ -16,6 +16,7 @@ import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error';
import type { IWorkflowDb } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Push } from '@/push';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { removeTrailingSlash } from '@/utils';
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
@ -41,6 +42,7 @@ export class TestWebhooks implements IWebhookManager {
private readonly nodeTypes: NodeTypes,
private readonly registrations: TestWebhookRegistrationsService,
private readonly orchestrationService: OrchestrationService,
private readonly publisher: Publisher,
) {}
private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {};
@ -156,8 +158,10 @@ export class TestWebhooks implements IWebhookManager {
pushRef &&
!this.push.getBackend().hasPushRef(pushRef)
) {
const payload = { webhookKey: key, workflowEntity, pushRef };
void this.orchestrationService.publish('clear-test-webhooks', payload);
void this.publisher.publishCommand({
command: 'clear-test-webhooks',
payload: { webhookKey: key, workflowEntity, pushRef },
});
return;
}

View file

@ -1,4 +1,5 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
import { Container } from 'typedi';
@ -278,3 +279,72 @@ describe('addWebhooks()', () => {
expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1);
});
});
describe('shouldAddWebhooks', () => {
describe('if leader', () => {
const activeWorkflowManager = new ActiveWorkflowManager(
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock<InstanceSettings>({ isLeader: true, isFollower: false }),
mock(),
);
test('should return `true` for `init`', () => {
// ensure webhooks are populated on init: https://github.com/n8n-io/n8n/pull/8830
const result = activeWorkflowManager.shouldAddWebhooks('init');
expect(result).toBe(true);
});
test('should return `false` for `leadershipChange`', () => {
const result = activeWorkflowManager.shouldAddWebhooks('leadershipChange');
expect(result).toBe(false);
});
test('should return `true` for `update` or `activate`', () => {
const modes = ['update', 'activate'] as WorkflowActivateMode[];
for (const mode of modes) {
const result = activeWorkflowManager.shouldAddWebhooks(mode);
expect(result).toBe(true);
}
});
});
describe('if follower', () => {
const activeWorkflowManager = new ActiveWorkflowManager(
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock(),
mock<InstanceSettings>({ isLeader: false, isFollower: true }),
mock(),
);
test('should return `false` for `update` or `activate`', () => {
const modes = ['update', 'activate'] as WorkflowActivateMode[];
for (const mode of modes) {
const result = activeWorkflowManager.shouldAddWebhooks(mode);
expect(result).toBe(false);
}
});
});
});

View file

@ -16,7 +16,7 @@ import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/wor
import * as testDb from '@test-integration/test-db';
describe('CollaborationService', () => {
mockInstance(Push, new Push(mock()));
mockInstance(Push, new Push(mock(), mock()));
let pushService: Push;
let collaborationService: CollaborationService;
let owner: User;

View file

@ -22,6 +22,7 @@ import type { MessageEventBusDestinationSentry } from '@/eventbus/message-event-
import type { MessageEventBusDestinationSyslog } from '@/eventbus/message-event-bus-destination/message-event-bus-destination-syslog.ee';
import type { MessageEventBusDestinationWebhook } from '@/eventbus/message-event-bus-destination/message-event-bus-destination-webhook.ee';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { createUser } from './shared/db/users';
import type { SuperAgentTest } from './shared/types';
@ -34,6 +35,8 @@ const mockedAxios = axios as jest.Mocked<typeof axios>;
jest.mock('syslog-client');
const mockedSyslog = syslog as jest.Mocked<typeof syslog>;
mockInstance(Publisher);
let owner: User;
let authOwnerAgent: SuperAgentTest;

View file

@ -63,6 +63,7 @@ const resetManager = async () => {
mockProvidersInstance,
Container.get(Cipher),
eventService,
mock(),
),
);

View file

@ -32,7 +32,6 @@ export { setupTestServer } from './test-server';
export async function initActiveWorkflowManager() {
mockInstance(OrchestrationService, {
isMultiMainSetupEnabled: false,
shouldAddWebhooks: jest.fn().mockReturnValue(true),
});
mockInstance(Push);