refactor(core): Use typedi to manage EventBus singletons (no-changelog) (#5795)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-03-29 20:38:47 +02:00 committed by GitHub
parent be373bb859
commit 522c790817
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 145 additions and 154 deletions

View file

@ -25,11 +25,11 @@ import type {
import { Telemetry } from '@/telemetry'; import { Telemetry } from '@/telemetry';
import type { AuthProviderType } from '@db/entities/AuthIdentity'; import type { AuthProviderType } from '@db/entities/AuthIdentity';
import { RoleService } from './role/role.service'; import { RoleService } from './role/role.service';
import { eventBus } from './eventbus';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import { N8N_VERSION } from '@/constants'; import { N8N_VERSION } from '@/constants';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { NodeTypes } from './NodeTypes'; import { NodeTypes } from './NodeTypes';
import { MessageEventBus } from '@/eventbus';
function userToPayload(user: User): { function userToPayload(user: User): {
userId: string; userId: string;
@ -51,7 +51,11 @@ function userToPayload(user: User): {
export class InternalHooks implements IInternalHooksClass { export class InternalHooks implements IInternalHooksClass {
private instanceId: string; private instanceId: string;
constructor(private telemetry: Telemetry, private nodeTypes: NodeTypes) {} constructor(
private telemetry: Telemetry,
private nodeTypes: NodeTypes,
private eventBus: MessageEventBus,
) {}
async init(instanceId: string) { async init(instanceId: string) {
this.instanceId = instanceId; this.instanceId = instanceId;
@ -111,7 +115,7 @@ export class InternalHooks implements IInternalHooksClass {
async onWorkflowCreated(user: User, workflow: IWorkflowBase, publicApi: boolean): Promise<void> { async onWorkflowCreated(user: User, workflow: IWorkflowBase, publicApi: boolean): Promise<void> {
const { nodeGraph } = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes); const { nodeGraph } = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.workflow.created', eventName: 'n8n.audit.workflow.created',
payload: { payload: {
...userToPayload(user), ...userToPayload(user),
@ -130,7 +134,7 @@ export class InternalHooks implements IInternalHooksClass {
async onWorkflowDeleted(user: User, workflowId: string, publicApi: boolean): Promise<void> { async onWorkflowDeleted(user: User, workflowId: string, publicApi: boolean): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.workflow.deleted', eventName: 'n8n.audit.workflow.deleted',
payload: { payload: {
...userToPayload(user), ...userToPayload(user),
@ -162,7 +166,7 @@ export class InternalHooks implements IInternalHooksClass {
} }
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.workflow.updated', eventName: 'n8n.audit.workflow.updated',
payload: { payload: {
...userToPayload(user), ...userToPayload(user),
@ -194,7 +198,7 @@ export class InternalHooks implements IInternalHooksClass {
nodeName: string, nodeName: string,
): Promise<void> { ): Promise<void> {
const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName); const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName);
void eventBus.sendNodeEvent({ void this.eventBus.sendNodeEvent({
eventName: 'n8n.node.started', eventName: 'n8n.node.started',
payload: { payload: {
executionId, executionId,
@ -212,7 +216,7 @@ export class InternalHooks implements IInternalHooksClass {
nodeName: string, nodeName: string,
): Promise<void> { ): Promise<void> {
const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName); const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName);
void eventBus.sendNodeEvent({ void this.eventBus.sendNodeEvent({
eventName: 'n8n.node.finished', eventName: 'n8n.node.finished',
payload: { payload: {
executionId, executionId,
@ -230,7 +234,7 @@ export class InternalHooks implements IInternalHooksClass {
): Promise<void> { ): Promise<void> {
void Promise.all([ void Promise.all([
Db.collections.Execution.update(executionId, { status: 'running' }), Db.collections.Execution.update(executionId, { status: 'running' }),
eventBus.sendWorkflowEvent({ this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.started', eventName: 'n8n.workflow.started',
payload: { payload: {
executionId, executionId,
@ -249,7 +253,7 @@ export class InternalHooks implements IInternalHooksClass {
workflowData?: IWorkflowBase, workflowData?: IWorkflowBase,
): Promise<void> { ): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendWorkflowEvent({ this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.crashed', eventName: 'n8n.workflow.crashed',
payload: { payload: {
executionId, executionId,
@ -410,7 +414,7 @@ export class InternalHooks implements IInternalHooksClass {
promises.push( promises.push(
properties.success properties.success
? eventBus.sendWorkflowEvent({ ? this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.success', eventName: 'n8n.workflow.success',
payload: { payload: {
executionId, executionId,
@ -421,7 +425,7 @@ export class InternalHooks implements IInternalHooksClass {
workflowName: workflow.name, workflowName: workflow.name,
}, },
}) })
: eventBus.sendWorkflowEvent({ : this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.failed', eventName: 'n8n.workflow.failed',
payload: { payload: {
executionId, executionId,
@ -469,7 +473,7 @@ export class InternalHooks implements IInternalHooksClass {
publicApi: boolean; publicApi: boolean;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.deleted', eventName: 'n8n.audit.user.deleted',
payload: { payload: {
...userToPayload(userDeletionData.user), ...userToPayload(userDeletionData.user),
@ -490,7 +494,7 @@ export class InternalHooks implements IInternalHooksClass {
email_sent: boolean; email_sent: boolean;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.invited', eventName: 'n8n.audit.user.invited',
payload: { payload: {
...userToPayload(userInviteData.user), ...userToPayload(userInviteData.user),
@ -512,7 +516,7 @@ export class InternalHooks implements IInternalHooksClass {
public_api: boolean; public_api: boolean;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.reinvited', eventName: 'n8n.audit.user.reinvited',
payload: { payload: {
...userToPayload(userReinviteData.user), ...userToPayload(userReinviteData.user),
@ -571,7 +575,7 @@ export class InternalHooks implements IInternalHooksClass {
async onUserUpdate(userUpdateData: { user: User; fields_changed: string[] }): Promise<void> { async onUserUpdate(userUpdateData: { user: User; fields_changed: string[] }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.updated', eventName: 'n8n.audit.user.updated',
payload: { payload: {
...userToPayload(userUpdateData.user), ...userToPayload(userUpdateData.user),
@ -590,7 +594,7 @@ export class InternalHooks implements IInternalHooksClass {
invitee: User; invitee: User;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.invitation.accepted', eventName: 'n8n.audit.user.invitation.accepted',
payload: { payload: {
invitee: { invitee: {
@ -609,7 +613,7 @@ export class InternalHooks implements IInternalHooksClass {
async onUserPasswordResetEmailClick(userPasswordResetData: { user: User }): Promise<void> { async onUserPasswordResetEmailClick(userPasswordResetData: { user: User }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.reset', eventName: 'n8n.audit.user.reset',
payload: { payload: {
...userToPayload(userPasswordResetData.user), ...userToPayload(userPasswordResetData.user),
@ -643,7 +647,7 @@ export class InternalHooks implements IInternalHooksClass {
async onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise<void> { async onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.api.deleted', eventName: 'n8n.audit.user.api.deleted',
payload: { payload: {
...userToPayload(apiKeyDeletedData.user), ...userToPayload(apiKeyDeletedData.user),
@ -658,7 +662,7 @@ export class InternalHooks implements IInternalHooksClass {
async onApiKeyCreated(apiKeyCreatedData: { user: User; public_api: boolean }): Promise<void> { async onApiKeyCreated(apiKeyCreatedData: { user: User; public_api: boolean }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.api.created', eventName: 'n8n.audit.user.api.created',
payload: { payload: {
...userToPayload(apiKeyCreatedData.user), ...userToPayload(apiKeyCreatedData.user),
@ -673,7 +677,7 @@ export class InternalHooks implements IInternalHooksClass {
async onUserPasswordResetRequestClick(userPasswordResetData: { user: User }): Promise<void> { async onUserPasswordResetRequestClick(userPasswordResetData: { user: User }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.reset.requested', eventName: 'n8n.audit.user.reset.requested',
payload: { payload: {
...userToPayload(userPasswordResetData.user), ...userToPayload(userPasswordResetData.user),
@ -697,7 +701,7 @@ export class InternalHooks implements IInternalHooksClass {
}, },
): Promise<void> { ): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.signedup', eventName: 'n8n.audit.user.signedup',
payload: { payload: {
...userToPayload(user), ...userToPayload(user),
@ -716,7 +720,7 @@ export class InternalHooks implements IInternalHooksClass {
public_api: boolean; public_api: boolean;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.email.failed', eventName: 'n8n.audit.user.email.failed',
payload: { payload: {
messageType: failedEmailData.message_type, messageType: failedEmailData.message_type,
@ -741,7 +745,7 @@ export class InternalHooks implements IInternalHooksClass {
public_api: boolean; public_api: boolean;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.credentials.created', eventName: 'n8n.audit.user.credentials.created',
payload: { payload: {
...userToPayload(userCreatedCredentialsData.user), ...userToPayload(userCreatedCredentialsData.user),
@ -769,7 +773,7 @@ export class InternalHooks implements IInternalHooksClass {
sharees_removed: number | null; sharees_removed: number | null;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.credentials.shared', eventName: 'n8n.audit.user.credentials.shared',
payload: { payload: {
...userToPayload(userSharedCredentialsData.user), ...userToPayload(userSharedCredentialsData.user),
@ -809,7 +813,7 @@ export class InternalHooks implements IInternalHooksClass {
failure_reason?: string; failure_reason?: string;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.package.installed', eventName: 'n8n.audit.package.installed',
payload: { payload: {
...userToPayload(installationData.user), ...userToPayload(installationData.user),
@ -847,7 +851,7 @@ export class InternalHooks implements IInternalHooksClass {
package_author_email?: string; package_author_email?: string;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.package.updated', eventName: 'n8n.audit.package.updated',
payload: { payload: {
...userToPayload(updateData.user), ...userToPayload(updateData.user),
@ -880,7 +884,7 @@ export class InternalHooks implements IInternalHooksClass {
package_author_email?: string; package_author_email?: string;
}): Promise<void> { }): Promise<void> {
void Promise.all([ void Promise.all([
eventBus.sendAuditEvent({ this.eventBus.sendAuditEvent({
eventName: 'n8n.audit.package.deleted', eventName: 'n8n.audit.package.deleted',
payload: { payload: {
...userToPayload(deleteData.user), ...userToPayload(deleteData.user),

View file

@ -146,7 +146,6 @@ import { configureMetrics } from './metrics';
import { setupBasicAuth } from './middlewares/basicAuth'; import { setupBasicAuth } from './middlewares/basicAuth';
import { setupExternalJWTAuth } from './middlewares/externalJWTAuth'; import { setupExternalJWTAuth } from './middlewares/externalJWTAuth';
import { PostHogClient } from './posthog'; import { PostHogClient } from './posthog';
import { eventBus } from './eventbus';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks'; import { InternalHooks } from './InternalHooks';
import { import {
@ -157,6 +156,7 @@ import { getSamlLoginLabel, isSamlLoginEnabled, isSamlLicensed } from './sso/sam
import { SamlController } from './sso/saml/routes/saml.controller.ee'; import { SamlController } from './sso/saml/routes/saml.controller.ee';
import { SamlService } from './sso/saml/saml.service.ee'; import { SamlService } from './sso/saml/saml.service.ee';
import { LdapManager } from './Ldap/LdapManager.ee'; import { LdapManager } from './Ldap/LdapManager.ee';
import { MessageEventBus } from '@/eventbus';
const exec = promisify(callbackExec); const exec = promisify(callbackExec);
@ -365,7 +365,7 @@ class Server extends AbstractServer {
return this.frontendSettings; return this.frontendSettings;
} }
private registerControllers(ignoredEndpoints: Readonly<string[]>) { private async registerControllers(ignoredEndpoints: Readonly<string[]>) {
const { app, externalHooks, activeWorkflowRunner, nodeTypes } = this; const { app, externalHooks, activeWorkflowRunner, nodeTypes } = this;
const repositories = Db.collections; const repositories = Db.collections;
setupAuthMiddlewares(app, ignoredEndpoints, this.restEndpoint, repositories.User); setupAuthMiddlewares(app, ignoredEndpoints, this.restEndpoint, repositories.User);
@ -376,8 +376,11 @@ class Server extends AbstractServer {
const postHog = this.postHog; const postHog = this.postHog;
const samlService = Container.get(SamlService); const samlService = Container.get(SamlService);
const eventBus = Container.get(MessageEventBus);
await eventBus.initialize();
const controllers: object[] = [ const controllers: object[] = [
new EventBusController(), new EventBusController(eventBus),
new AuthController({ config, internalHooks, repositories, logger, postHog }), new AuthController({ config, internalHooks, repositories, logger, postHog }),
new OwnerController({ config, internalHooks, repositories, logger }), new OwnerController({ config, internalHooks, repositories, logger }),
new MeController({ externalHooks, internalHooks, repositories, logger }), new MeController({ externalHooks, internalHooks, repositories, logger }),
@ -497,7 +500,7 @@ class Server extends AbstractServer {
await handleLdapInit(); await handleLdapInit();
this.registerControllers(ignoredEndpoints); await this.registerControllers(ignoredEndpoints);
this.app.use(`/${this.restEndpoint}/credentials`, credentialsController); this.app.use(`/${this.restEndpoint}/credentials`, credentialsController);
@ -1223,14 +1226,6 @@ class Server extends AbstractServer {
), ),
); );
// ----------------------------------------
// EventBus Setup
// ----------------------------------------
if (!eventBus.isInitialized) {
await eventBus.initialize();
}
// ---------------------------------------- // ----------------------------------------
// Webhooks // Webhooks
// ---------------------------------------- // ----------------------------------------

View file

@ -55,7 +55,7 @@ import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting'; import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { Push } from '@/push'; import { Push } from '@/push';
import { eventBus } from './eventbus'; import { MessageEventBus } from '@/eventbus';
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents'; import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks'; import { InternalHooks } from './InternalHooks';
@ -67,9 +67,12 @@ export class WorkflowRunner {
jobQueue: JobQueue; jobQueue: JobQueue;
eventBus: MessageEventBus;
constructor() { constructor() {
this.push = Container.get(Push); this.push = Container.get(Push);
this.activeExecutions = Container.get(ActiveExecutions); this.activeExecutions = Container.get(ActiveExecutions);
this.eventBus = Container.get(MessageEventBus);
} }
/** /**
@ -116,7 +119,7 @@ export class WorkflowRunner {
// does contain those messages. // does contain those messages.
try { try {
// Search for messages for this executionId in event logs // Search for messages for this executionId in event logs
const eventLogMessages = await eventBus.getEventsByExecutionId(executionId); const eventLogMessages = await this.eventBus.getEventsByExecutionId(executionId);
// Attempt to recover more better runData from these messages (but don't update the execution db entry yet) // Attempt to recover more better runData from these messages (but don't update the execution db entry yet)
if (eventLogMessages.length > 0) { if (eventLogMessages.length > 0) {
const eventLogExecutionData = await recoverExecutionDataFromEventLogMessages( const eventLogExecutionData = await recoverExecutionDataFromEventLogMessages(

View file

@ -4,6 +4,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/naming-convention */ /* eslint-disable @typescript-eslint/naming-convention */
import { Container } from 'typedi';
import { Router } from 'express'; import { Router } from 'express';
import bodyParser from 'body-parser'; import bodyParser from 'body-parser';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
@ -11,7 +12,7 @@ import config from '@/config';
import * as Db from '@/Db'; import * as Db from '@/Db';
import type { Role } from '@db/entities/Role'; import type { Role } from '@db/entities/Role';
import { hashPassword } from '@/UserManagement/UserManagementHelper'; import { hashPassword } from '@/UserManagement/UserManagementHelper';
import { eventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { MessageEventBus } from '@/eventbus';
if (process.env.E2E_TESTS !== 'true') { if (process.env.E2E_TESTS !== 'true') {
console.error('E2E endpoints only allowed during E2E tests'); console.error('E2E endpoints only allowed during E2E tests');
@ -79,6 +80,7 @@ const setupUserManagement = async () => {
const resetLogStreaming = async () => { const resetLogStreaming = async () => {
config.set('enterprise.features.logStreaming', false); config.set('enterprise.features.logStreaming', false);
const eventBus = Container.get(MessageEventBus);
for (const id in eventBus.destinations) { for (const id in eventBus.destinations) {
await eventBus.removeDestination(id); await eventBus.removeDestination(id);
} }

View file

@ -25,10 +25,10 @@ import * as Server from '@/Server';
import { TestWebhooks } from '@/TestWebhooks'; import { TestWebhooks } from '@/TestWebhooks';
import { getAllInstalledPackages } from '@/CommunityNodes/packageModel'; import { getAllInstalledPackages } from '@/CommunityNodes/packageModel';
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants'; import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand'; import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { License } from '@/License'; import { License } from '@/License';
import { MessageEventBus } from '@/eventbus';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open'); const open = require('open');
@ -133,7 +133,7 @@ export class Start extends BaseCommand {
} }
//finally shut down Event Bus //finally shut down Event Bus
await eventBus.close(); await Container.get(MessageEventBus).close();
} catch (error) { } catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('There was an error shutting down n8n.', error);
} }

View file

@ -1,3 +1,4 @@
import { Service } from 'typedi';
import { LoggerProxy } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm'; import type { DeleteResult } from 'typeorm';
@ -37,12 +38,9 @@ export interface MessageWithCallback {
confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void; confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void;
} }
@Service()
export class MessageEventBus extends EventEmitter { export class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus; private isInitialized = false;
isInitialized: boolean;
logWriter: MessageEventBusLogWriter;
destinations: { destinations: {
[key: string]: MessageEventBusDestination; [key: string]: MessageEventBusDestination;
@ -50,16 +48,8 @@ export class MessageEventBus extends EventEmitter {
private pushIntervalTimer: NodeJS.Timer; private pushIntervalTimer: NodeJS.Timer;
constructor() { constructor(private logWriter: MessageEventBusLogWriter) {
super(); super();
this.isInitialized = false;
}
static getInstance(): MessageEventBus {
if (!MessageEventBus.instance) {
MessageEventBus.instance = new MessageEventBus();
}
return MessageEventBus.instance;
} }
/** /**
@ -93,7 +83,7 @@ export class MessageEventBus extends EventEmitter {
} }
LoggerProxy.debug('Initializing event writer'); LoggerProxy.debug('Initializing event writer');
this.logWriter = await MessageEventBusLogWriter.getInstance(); await this.logWriter.startThread();
// unsent event check: // unsent event check:
// - find unsent messages in current event log(s) // - find unsent messages in current event log(s)
@ -102,9 +92,9 @@ export class MessageEventBus extends EventEmitter {
LoggerProxy.debug('Checking for unsent event messages'); LoggerProxy.debug('Checking for unsent event messages');
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug( LoggerProxy.debug(
`Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, `Start logging into ${this.logWriter.getLogFileName() ?? 'unknown filename'} `,
); );
this.logWriter?.startLogging(); this.logWriter.startLogging();
await this.send(unsentAndUnfinished.unsentMessages); await this.send(unsentAndUnfinished.unsentMessages);
if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) { if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) {
@ -171,7 +161,7 @@ export class MessageEventBus extends EventEmitter {
async close() { async close() {
LoggerProxy.debug('Shutting down event writer...'); LoggerProxy.debug('Shutting down event writer...');
await this.logWriter?.close(); await this.logWriter.close();
for (const destinationName of Object.keys(this.destinations)) { for (const destinationName of Object.keys(this.destinations)) {
LoggerProxy.debug( LoggerProxy.debug(
`Shutting down event destination ${this.destinations[destinationName].getId()}...`, `Shutting down event destination ${this.destinations[destinationName].getId()}...`,
@ -186,7 +176,7 @@ export class MessageEventBus extends EventEmitter {
msgs = [msgs]; msgs = [msgs];
} }
for (const msg of msgs) { for (const msg of msgs) {
this.logWriter?.putMessage(msg); this.logWriter.putMessage(msg);
// if there are no set up destinations, immediately mark the event as sent // if there are no set up destinations, immediately mark the event as sent
if (!this.shouldSendMsg(msg)) { if (!this.shouldSendMsg(msg)) {
this.confirmSent(msg, { id: '0', name: 'eventBus' }); this.confirmSent(msg, { id: '0', name: 'eventBus' });
@ -211,7 +201,7 @@ export class MessageEventBus extends EventEmitter {
} }
confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) { confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) {
this.logWriter?.confirmMessageSent(msg.id, source); this.logWriter.confirmMessageSent(msg.id, source);
} }
private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean { private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean {
@ -256,7 +246,7 @@ export class MessageEventBus extends EventEmitter {
async getEventsFailed(amount = 5): Promise<FailedEventSummary[]> { async getEventsFailed(amount = 5): Promise<FailedEventSummary[]> {
const result: FailedEventSummary[] = []; const result: FailedEventSummary[] = [];
try { try {
const queryResult = await this.logWriter?.getMessagesAll(); const queryResult = await this.logWriter.getMessagesAll();
const uniques = uniqby(queryResult, 'id'); const uniques = uniqby(queryResult, 'id');
const filteredExecutionIds = uniques const filteredExecutionIds = uniques
.filter((e) => .filter((e) =>
@ -296,25 +286,25 @@ export class MessageEventBus extends EventEmitter {
} }
async getEventsAll(): Promise<EventMessageTypes[]> { async getEventsAll(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesAll(); const queryResult = await this.logWriter.getMessagesAll();
const filtered = uniqby(queryResult, 'id'); const filtered = uniqby(queryResult, 'id');
return filtered; return filtered;
} }
async getEventsSent(): Promise<EventMessageTypes[]> { async getEventsSent(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesSent(); const queryResult = await this.logWriter.getMessagesSent();
const filtered = uniqby(queryResult, 'id'); const filtered = uniqby(queryResult, 'id');
return filtered; return filtered;
} }
async getEventsUnsent(): Promise<EventMessageTypes[]> { async getEventsUnsent(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesUnsent(); const queryResult = await this.logWriter.getMessagesUnsent();
const filtered = uniqby(queryResult, 'id'); const filtered = uniqby(queryResult, 'id');
return filtered; return filtered;
} }
async getUnfinishedExecutions(): Promise<Record<string, EventMessageTypes[]>> { async getUnfinishedExecutions(): Promise<Record<string, EventMessageTypes[]>> {
const queryResult = await this.logWriter?.getUnfinishedExecutions(); const queryResult = await this.logWriter.getUnfinishedExecutions();
return queryResult; return queryResult;
} }
@ -322,7 +312,7 @@ export class MessageEventBus extends EventEmitter {
unsentMessages: EventMessageTypes[]; unsentMessages: EventMessageTypes[];
unfinishedExecutions: Record<string, EventMessageTypes[]>; unfinishedExecutions: Record<string, EventMessageTypes[]>;
}> { }> {
const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions(); const queryResult = await this.logWriter.getUnsentAndUnfinishedExecutions();
return queryResult; return queryResult;
} }
@ -336,7 +326,7 @@ export class MessageEventBus extends EventEmitter {
executionId: string, executionId: string,
logHistory?: number, logHistory?: number,
): Promise<EventMessageTypes[]> { ): Promise<EventMessageTypes[]> {
const result = await this.logWriter?.getMessagesByExecutionId(executionId, logHistory); const result = await this.logWriter.getMessagesByExecutionId(executionId, logHistory);
return result; return result;
} }
/** /**
@ -355,5 +345,3 @@ export class MessageEventBus extends EventEmitter {
await this.send(new EventMessageNode(options)); await this.send(new EventMessageNode(options));
} }
} }
export const eventBus = MessageEventBus.getInstance();

View file

@ -1,3 +1,4 @@
import { Container } from 'typedi';
import { parse, stringify } from 'flatted'; import { parse, stringify } from 'flatted';
import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow';
import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow'; import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow';
@ -5,12 +6,11 @@ import * as Db from '@/Db';
import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses'; import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses';
import type { DateTime } from 'luxon'; import type { DateTime } from 'luxon';
import { Push } from '@/push'; import { Push } from '@/push';
import type { IPushDataExecutionRecovered } from '../../Interfaces'; import type { IPushDataExecutionRecovered } from '@/Interfaces';
import { workflowExecutionCompleted } from '../../events/WorkflowStatistics'; import { workflowExecutionCompleted } from '@/events/WorkflowStatistics';
import { eventBus } from './MessageEventBus';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData';
import { MessageEventBus } from './MessageEventBus';
export async function recoverExecutionDataFromEventLogMessages( export async function recoverExecutionDataFromEventLogMessages(
executionId: string, executionId: string,
@ -201,7 +201,7 @@ export async function recoverExecutionDataFromEventLogMessages(
await workflowExecutionCompleted(executionEntry.workflowData, iRunData); await workflowExecutionCompleted(executionEntry.workflowData, iRunData);
// wait for UI to be back up and send the execution data // wait for UI to be back up and send the execution data
eventBus.once('editorUiConnected', function handleUiBackUp() { Container.get(MessageEventBus).once('editorUiConnected', function handleUiBackUp() {
// add a small timeout to make sure the UI is back up // add a small timeout to make sure the UI is back up
setTimeout(() => { setTimeout(() => {
Container.get(Push).send('executionRecovered', { Container.get(Push).send('executionRecovered', {

View file

@ -1,14 +1,17 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage'; import { Service } from 'typedi';
import { UserSettings } from 'n8n-core'; import { once as eventOnce } from 'events';
import path, { parse } from 'path'; import path, { parse } from 'path';
import { Worker } from 'worker_threads'; import { Worker } from 'worker_threads';
import { createReadStream, existsSync, rmSync } from 'fs'; import { createReadStream, existsSync, rmSync } from 'fs';
import readline from 'readline'; import readline from 'readline';
import { UserSettings } from 'n8n-core';
import { jsonParse, LoggerProxy } from 'n8n-workflow'; import { jsonParse, LoggerProxy } from 'n8n-workflow';
import remove from 'lodash.remove'; import remove from 'lodash.remove';
import config from '@/config'; import config from '@/config';
import { inTest } from '@/constants';
import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import type { EventMessageReturnMode } from '../MessageEventBus/MessageEventBus'; import type { EventMessageReturnMode } from '../MessageEventBus/MessageEventBus';
import type { EventMessageTypes } from '../EventMessageClasses'; import type { EventMessageTypes } from '../EventMessageClasses';
@ -17,15 +20,6 @@ import {
EventMessageConfirm, EventMessageConfirm,
isEventMessageConfirm, isEventMessageConfirm,
} from '../EventMessageClasses/EventMessageConfirm'; } from '../EventMessageClasses/EventMessageConfirm';
import { once as eventOnce } from 'events';
import { inTest } from '../../constants';
interface MessageEventBusLogWriterConstructorOptions {
logBaseName?: string;
logBasePath?: string;
keepNumberOfFiles?: number;
maxFileSizeInKB?: number;
}
export interface MessageEventBusLogWriterOptions { export interface MessageEventBusLogWriterOptions {
logFullBasePath: string; logFullBasePath: string;
@ -42,41 +36,23 @@ interface ReadMessagesFromLogFileResult {
/** /**
* MessageEventBusWriter for Files * MessageEventBusWriter for Files
*/ */
@Service()
export class MessageEventBusLogWriter { export class MessageEventBusLogWriter {
private static instance: MessageEventBusLogWriter; private options: Required<MessageEventBusLogWriterOptions>;
static options: Required<MessageEventBusLogWriterOptions>;
private _worker: Worker | undefined; private _worker: Worker | undefined;
public get worker(): Worker | undefined { constructor() {
return this._worker; const { keepLogCount, logBaseName, maxFileSizeInKB } = config.get('eventBus.logWriter');
this.options = {
logFullBasePath: path.join(UserSettings.getUserN8nFolderPath(), logBaseName),
keepNumberOfFiles: keepLogCount,
maxFileSizeInKB,
};
} }
/** public get worker(): Worker | undefined {
* Instantiates the Writer and the corresponding worker thread. return this._worker;
* To actually start logging, call startLogging() function on the instance.
*
* **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging()
*/
static async getInstance(
options?: MessageEventBusLogWriterConstructorOptions,
): Promise<MessageEventBusLogWriter> {
if (!MessageEventBusLogWriter.instance) {
MessageEventBusLogWriter.instance = new MessageEventBusLogWriter();
MessageEventBusLogWriter.options = {
logFullBasePath: path.join(
options?.logBasePath ?? UserSettings.getUserN8nFolderPath(),
options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'),
),
keepNumberOfFiles:
options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'),
maxFileSizeInKB:
options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'),
};
await MessageEventBusLogWriter.instance.startThread();
}
return MessageEventBusLogWriter.instance;
} }
/** /**
@ -98,13 +74,19 @@ export class MessageEventBusLogWriter {
} }
} }
private async startThread() { /**
* Instantiates the Writer and the corresponding worker thread.
* To actually start logging, call startLogging() function on the instance.
*
* **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging()
*/
async startThread() {
if (this.worker) { if (this.worker) {
await this.close(); await this.close();
} }
await MessageEventBusLogWriter.instance.spawnThread(); await this.spawnThread();
if (this.worker) { if (this.worker) {
this.worker.postMessage({ command: 'initialize', data: MessageEventBusLogWriter.options }); this.worker.postMessage({ command: 'initialize', data: this.options });
} }
} }
@ -120,7 +102,7 @@ export class MessageEventBusLogWriter {
if (this.worker) { if (this.worker) {
this.worker.on('messageerror', async (error) => { this.worker.on('messageerror', async (error) => {
LoggerProxy.error('Event Bus Log Writer thread error, attempting to restart...', error); LoggerProxy.error('Event Bus Log Writer thread error, attempting to restart...', error);
await MessageEventBusLogWriter.instance.startThread(); await this.startThread();
}); });
return true; return true;
} }
@ -235,14 +217,14 @@ export class MessageEventBusLogWriter {
getLogFileName(counter?: number): string { getLogFileName(counter?: number): string {
if (counter) { if (counter) {
return `${MessageEventBusLogWriter.options.logFullBasePath}-${counter}.log`; return `${this.options.logFullBasePath}-${counter}.log`;
} else { } else {
return `${MessageEventBusLogWriter.options.logFullBasePath}.log`; return `${this.options.logFullBasePath}.log`;
} }
} }
cleanAllLogs() { cleanAllLogs() {
for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) { for (let i = 0; i <= this.options.keepNumberOfFiles; i++) {
if (existsSync(this.getLogFileName(i))) { if (existsSync(this.getLogFileName(i))) {
rmSync(this.getLogFileName(i)); rmSync(this.getLogFileName(i));
} }

View file

@ -90,11 +90,12 @@ if (!isMainThread) {
clearInterval(fileStatTimer); clearInterval(fileStatTimer);
break; break;
case 'initialize': case 'initialize':
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const { logFullBasePath, keepNumberOfFiles, maxFileSizeInKB } =
data as MessageEventBusLogWriterOptions;
const settings: MessageEventBusLogWriterOptions = { const settings: MessageEventBusLogWriterOptions = {
logFullBasePath: (data as MessageEventBusLogWriterOptions).logFullBasePath ?? '', logFullBasePath: logFullBasePath ?? '',
keepNumberOfFiles: (data as MessageEventBusLogWriterOptions).keepNumberOfFiles ?? 10, keepNumberOfFiles: keepNumberOfFiles ?? 3,
maxFileSizeInKB: (data as MessageEventBusLogWriterOptions).maxFileSizeInKB ?? 102400, maxFileSizeInKB: maxFileSizeInKB ?? 1024,
}; };
setLogFileBasePath(settings.logFullBasePath); setLogFileBasePath(settings.logFullBasePath);
setKeepFiles(settings.keepNumberOfFiles); setKeepFiles(settings.keepNumberOfFiles);

View file

@ -6,8 +6,8 @@ import { isEventMessageOptions } from './EventMessageClasses/AbstractEventMessag
import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric'; import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric';
import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMessageWorkflow'; import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMessageWorkflow';
import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow'; import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow';
import { MessageEventBus } from './MessageEventBus/MessageEventBus';
import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus'; import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus';
import { eventBus } from './MessageEventBus/MessageEventBus';
import { import {
isMessageEventBusDestinationSentryOptions, isMessageEventBusDestinationSentryOptions,
MessageEventBusDestinationSentry, MessageEventBusDestinationSentry,
@ -76,6 +76,8 @@ const isMessageEventBusDestinationOptions = (
@RestController('/eventbus') @RestController('/eventbus')
export class EventBusController { export class EventBusController {
constructor(private eventBus: MessageEventBus) {}
// ---------------------------------------- // ----------------------------------------
// Events // Events
// ---------------------------------------- // ----------------------------------------
@ -86,24 +88,24 @@ export class EventBusController {
if (isWithQueryString(req.query)) { if (isWithQueryString(req.query)) {
switch (req.query.query as EventMessageReturnMode) { switch (req.query.query as EventMessageReturnMode) {
case 'sent': case 'sent':
return eventBus.getEventsSent(); return this.eventBus.getEventsSent();
case 'unsent': case 'unsent':
return eventBus.getEventsUnsent(); return this.eventBus.getEventsUnsent();
case 'unfinished': case 'unfinished':
return eventBus.getUnfinishedExecutions(); return this.eventBus.getUnfinishedExecutions();
case 'all': case 'all':
default: default:
return eventBus.getEventsAll(); return this.eventBus.getEventsAll();
} }
} else { } else {
return eventBus.getEventsAll(); return this.eventBus.getEventsAll();
} }
} }
@Get('/failed') @Get('/failed')
async getFailedEvents(req: express.Request): Promise<FailedEventSummary[]> { async getFailedEvents(req: express.Request): Promise<FailedEventSummary[]> {
const amount = parseInt(req.query?.amount as string) ?? 5; const amount = parseInt(req.query?.amount as string) ?? 5;
return eventBus.getEventsFailed(amount); return this.eventBus.getEventsFailed(amount);
} }
@Get('/execution/:id') @Get('/execution/:id')
@ -113,7 +115,7 @@ export class EventBusController {
if (req.query?.logHistory) { if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10); logHistory = parseInt(req.query.logHistory as string, 10);
} }
return eventBus.getEventsByExecutionId(req.params.id, logHistory); return this.eventBus.getEventsByExecutionId(req.params.id, logHistory);
} }
return; return;
} }
@ -124,7 +126,7 @@ export class EventBusController {
if (req.params?.id) { if (req.params?.id) {
const logHistory = parseInt(req.query.logHistory as string, 10) || undefined; const logHistory = parseInt(req.query.logHistory as string, 10) || undefined;
const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true; const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true;
const messages = await eventBus.getEventsByExecutionId(id, logHistory); const messages = await this.eventBus.getEventsByExecutionId(id, logHistory);
if (messages.length > 0) { if (messages.length > 0) {
return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb); return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb);
} }
@ -150,7 +152,7 @@ export class EventBusController {
default: default:
msg = new EventMessageGeneric(req.body); msg = new EventMessageGeneric(req.body);
} }
await eventBus.send(msg); await this.eventBus.send(msg);
} else { } else {
throw new BadRequestError( throw new BadRequestError(
'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}', 'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}',
@ -166,9 +168,9 @@ export class EventBusController {
@Get('/destination') @Get('/destination')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> { async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) { if (isWithIdString(req.query)) {
return eventBus.findDestination(req.query.id); return this.eventBus.findDestination(req.query.id);
} else { } else {
return eventBus.findDestination(); return this.eventBus.findDestination();
} }
} }
@ -183,22 +185,22 @@ export class EventBusController {
switch (req.body.__type) { switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry: case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) { if (isMessageEventBusDestinationSentryOptions(req.body)) {
result = await eventBus.addDestination( result = await this.eventBus.addDestination(
new MessageEventBusDestinationSentry(eventBus, req.body), new MessageEventBusDestinationSentry(this.eventBus, req.body),
); );
} }
break; break;
case MessageEventBusDestinationTypeNames.webhook: case MessageEventBusDestinationTypeNames.webhook:
if (isMessageEventBusDestinationWebhookOptions(req.body)) { if (isMessageEventBusDestinationWebhookOptions(req.body)) {
result = await eventBus.addDestination( result = await this.eventBus.addDestination(
new MessageEventBusDestinationWebhook(eventBus, req.body), new MessageEventBusDestinationWebhook(this.eventBus, req.body),
); );
} }
break; break;
case MessageEventBusDestinationTypeNames.syslog: case MessageEventBusDestinationTypeNames.syslog:
if (isMessageEventBusDestinationSyslogOptions(req.body)) { if (isMessageEventBusDestinationSyslogOptions(req.body)) {
result = await eventBus.addDestination( result = await this.eventBus.addDestination(
new MessageEventBusDestinationSyslog(eventBus, req.body), new MessageEventBusDestinationSyslog(this.eventBus, req.body),
); );
} }
break; break;
@ -223,7 +225,7 @@ export class EventBusController {
@Get('/testmessage') @Get('/testmessage')
async sendTestMessage(req: express.Request): Promise<boolean> { async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) { if (isWithIdString(req.query)) {
return eventBus.testDestination(req.query.id); return this.eventBus.testDestination(req.query.id);
} }
return false; return false;
} }
@ -234,7 +236,7 @@ export class EventBusController {
throw new ResponseHelper.UnauthorizedError('Invalid request'); throw new ResponseHelper.UnauthorizedError('Invalid request');
} }
if (isWithIdString(req.query)) { if (isWithIdString(req.query)) {
return eventBus.removeDestination(req.query.id); return this.eventBus.removeDestination(req.query.id);
} else { } else {
throw new BadRequestError('Query is missing id'); throw new BadRequestError('Query is missing id');
} }

View file

@ -1 +1 @@
export { eventBus } from './MessageEventBus/MessageEventBus'; export { MessageEventBus } from './MessageEventBus/MessageEventBus';

View file

@ -1,17 +1,24 @@
import { Container } from 'typedi';
import { jsonStringify, LoggerProxy as Logger } from 'n8n-workflow'; import { jsonStringify, LoggerProxy as Logger } from 'n8n-workflow';
import type { IPushDataType } from '@/Interfaces'; import type { IPushDataType } from '@/Interfaces';
import { eventBus } from '../eventbus'; import { MessageEventBus } from '@/eventbus';
export abstract class AbstractPush<T> { export abstract class AbstractPush<T> {
protected connections: Record<string, T> = {}; protected connections: Record<string, T> = {};
protected eventBus: MessageEventBus;
constructor() {
this.eventBus = Container.get(MessageEventBus);
}
protected abstract close(connection: T): void; protected abstract close(connection: T): void;
protected abstract sendToOne(connection: T, data: string): void; protected abstract sendToOne(connection: T, data: string): void;
protected add(sessionId: string, connection: T): void { protected add(sessionId: string, connection: T): void {
const { connections } = this; const { connections } = this;
Logger.debug('Add editor-UI session', { sessionId }); Logger.debug('Add editor-UI session', { sessionId });
eventBus.emit('editorUiConnected', sessionId); this.eventBus.emit('editorUiConnected', sessionId);
const existingConnection = connections[sessionId]; const existingConnection = connections[sessionId];
if (existingConnection) { if (existingConnection) {

View file

@ -1,3 +1,4 @@
import { Container } from 'typedi';
import express from 'express'; import express from 'express';
import config from '@/config'; import config from '@/config';
import axios from 'axios'; import axios from 'axios';
@ -16,7 +17,7 @@ import {
MessageEventBusDestinationSyslogOptions, MessageEventBusDestinationSyslogOptions,
MessageEventBusDestinationWebhookOptions, MessageEventBusDestinationWebhookOptions,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { eventBus } from '@/eventbus'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric';
import { MessageEventBusDestinationSyslog } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; import { MessageEventBusDestinationSyslog } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; import { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
@ -64,6 +65,8 @@ const testSentryDestination: MessageEventBusDestinationSentryOptions = {
subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'],
}; };
const eventBus = Container.get(MessageEventBus);
async function confirmIdInAll(id: string) { async function confirmIdInAll(id: string) {
const sent = await eventBus.getEventsAll(); const sent = await eventBus.getEventsAll();
expect(sent.length).toBeGreaterThan(0); expect(sent.length).toBeGreaterThan(0);

View file

@ -81,6 +81,7 @@ import { setSamlLoginEnabled } from '@/sso/saml/samlHelpers';
import { SamlService } from '@/sso/saml/saml.service.ee'; import { SamlService } from '@/sso/saml/saml.service.ee';
import { SamlController } from '@/sso/saml/routes/saml.controller.ee'; import { SamlController } from '@/sso/saml/routes/saml.controller.ee';
import { EventBusController } from '@/eventbus/eventBus.controller'; import { EventBusController } from '@/eventbus/eventBus.controller';
import { MessageEventBus } from '@/eventbus';
export const mockInstance = <T>( export const mockInstance = <T>(
ctor: new (...args: any[]) => T, ctor: new (...args: any[]) => T,
@ -176,7 +177,8 @@ export async function initTestServer({
for (const group of functionEndpoints) { for (const group of functionEndpoints) {
switch (group) { switch (group) {
case 'eventBus': case 'eventBus':
registerController(testServer.app, config, new EventBusController()); const eventBus = Container.get(MessageEventBus);
registerController(testServer.app, config, new EventBusController(eventBus));
break; break;
case 'auth': case 'auth':
registerController( registerController(

View file

@ -29,6 +29,7 @@ import { mockInstance } from '../integration/shared/utils';
import { Push } from '@/push'; import { Push } from '@/push';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import { NodeTypes } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes';
import { MessageEventBus } from '@/eventbus';
/** /**
* TODO: * TODO:
@ -155,6 +156,7 @@ describe('ActiveWorkflowRunner', () => {
}; };
Container.set(LoadNodesAndCredentials, nodesAndCredentials); Container.set(LoadNodesAndCredentials, nodesAndCredentials);
mockInstance(Push); mockInstance(Push);
mockInstance(MessageEventBus);
}); });
beforeEach(() => { beforeEach(() => {