diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index 257be0f4b4..33f0d16e23 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -1,5 +1,6 @@ import { Service } from 'typedi'; import { snakeCase } from 'change-case'; +import { get as pslGet } from 'psl'; import type { AuthenticationMethod, ExecutionStatus, @@ -10,7 +11,15 @@ import type { WorkflowExecuteMode, } from 'n8n-workflow'; import { TelemetryHelpers } from 'n8n-workflow'; -import { get as pslGet } from 'psl'; +import { InstanceSettings } from 'n8n-core'; + +import { N8N_VERSION } from '@/constants'; +import type { AuthProviderType } from '@db/entities/AuthIdentity'; +import type { GlobalRole, User } from '@db/entities/User'; +import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata'; +import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; +import { MessageEventBus, type EventPayloadWorkflow } from '@/eventbus'; +import { determineFinalExecutionStatus } from '@/executionLifecycleHooks/shared/sharedHookFunctions'; import type { IDiagnosticInfo, ITelemetryUserDeletionData, @@ -18,18 +27,9 @@ import type { IExecutionTrackProperties, IWorkflowExecutionDataProcess, } from '@/Interfaces'; -import { Telemetry } from '@/telemetry'; -import type { AuthProviderType } from '@db/entities/AuthIdentity'; -import { eventBus } from './eventbus'; import { EventsService } from '@/services/events.service'; -import type { GlobalRole, User } from '@db/entities/User'; -import { N8N_VERSION } from '@/constants'; import { NodeTypes } from '@/NodeTypes'; -import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata'; -import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; -import type { EventPayloadWorkflow } from './eventbus/EventMessageClasses/EventMessageWorkflow'; -import { determineFinalExecutionStatus } from './executionLifecycleHooks/shared/sharedHookFunctions'; -import { InstanceSettings } from 'n8n-core'; +import { Telemetry } from '@/telemetry'; function userToPayload(user: User): { userId: string; @@ -55,6 +55,7 @@ export class InternalHooks { private sharedWorkflowRepository: SharedWorkflowRepository, eventsService: EventsService, private readonly instanceSettings: InstanceSettings, + private readonly eventBus: MessageEventBus, ) { eventsService.on( 'telemetry.onFirstProductionWorkflowSuccess', @@ -122,7 +123,7 @@ export class InternalHooks { async onWorkflowCreated(user: User, workflow: IWorkflowBase, publicApi: boolean): Promise { const { nodeGraph } = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes); void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.workflow.created', payload: { ...userToPayload(user), @@ -141,7 +142,7 @@ export class InternalHooks { async onWorkflowDeleted(user: User, workflowId: string, publicApi: boolean): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.workflow.deleted', payload: { ...userToPayload(user), @@ -173,7 +174,7 @@ export class InternalHooks { } void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.workflow.updated', payload: { ...userToPayload(user), @@ -201,7 +202,7 @@ export class InternalHooks { nodeName: string, ): Promise { const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName); - void eventBus.sendNodeEvent({ + void this.eventBus.sendNodeEvent({ eventName: 'n8n.node.started', payload: { executionId, @@ -219,7 +220,7 @@ export class InternalHooks { nodeName: string, ): Promise { const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName); - void eventBus.sendNodeEvent({ + void this.eventBus.sendNodeEvent({ eventName: 'n8n.node.finished', payload: { executionId, @@ -255,7 +256,7 @@ export class InternalHooks { workflowName: (data as IWorkflowBase).name, }; } - void eventBus.sendWorkflowEvent({ + void this.eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.started', payload, }); @@ -277,7 +278,7 @@ export class InternalHooks { } catch {} void Promise.all([ - eventBus.sendWorkflowEvent({ + this.eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.crashed', payload: { executionId, @@ -435,11 +436,11 @@ export class InternalHooks { }; promises.push( telemetryProperties.success - ? eventBus.sendWorkflowEvent({ + ? this.eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.success', payload: sharedEventPayload, }) - : eventBus.sendWorkflowEvent({ + : this.eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.failed', payload: { ...sharedEventPayload, @@ -480,7 +481,7 @@ export class InternalHooks { publicApi: boolean; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.deleted', payload: { ...userToPayload(userDeletionData.user), @@ -502,7 +503,7 @@ export class InternalHooks { invitee_role: string; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.invited', payload: { ...userToPayload(userInviteData.user), @@ -537,7 +538,7 @@ export class InternalHooks { public_api: boolean; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.reinvited', payload: { ...userToPayload(userReinviteData.user), @@ -596,7 +597,7 @@ export class InternalHooks { async onUserUpdate(userUpdateData: { user: User; fields_changed: string[] }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.updated', payload: { ...userToPayload(userUpdateData.user), @@ -615,7 +616,7 @@ export class InternalHooks { invitee: User; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.invitation.accepted', payload: { invitee: { @@ -634,7 +635,7 @@ export class InternalHooks { async onUserPasswordResetEmailClick(userPasswordResetData: { user: User }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.reset', payload: { ...userToPayload(userPasswordResetData.user), @@ -673,7 +674,7 @@ export class InternalHooks { async onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.api.deleted', payload: { ...userToPayload(apiKeyDeletedData.user), @@ -688,7 +689,7 @@ export class InternalHooks { async onApiKeyCreated(apiKeyCreatedData: { user: User; public_api: boolean }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.api.created', payload: { ...userToPayload(apiKeyCreatedData.user), @@ -703,7 +704,7 @@ export class InternalHooks { async onUserPasswordResetRequestClick(userPasswordResetData: { user: User }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.reset.requested', payload: { ...userToPayload(userPasswordResetData.user), @@ -727,7 +728,7 @@ export class InternalHooks { }, ): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.signedup', payload: { ...userToPayload(user), @@ -751,7 +752,7 @@ export class InternalHooks { public_api: boolean; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.email.failed', payload: { messageType: failedEmailData.message_type, @@ -769,7 +770,7 @@ export class InternalHooks { authenticationMethod: AuthenticationMethod; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.login.success', payload: { authenticationMethod: userLoginData.authenticationMethod, @@ -785,7 +786,7 @@ export class InternalHooks { reason?: string; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.login.failed', payload: { authenticationMethod: userLoginData.authenticationMethod, @@ -808,7 +809,7 @@ export class InternalHooks { public_api: boolean; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.credentials.created', payload: { ...userToPayload(userCreatedCredentialsData.user), @@ -836,7 +837,7 @@ export class InternalHooks { sharees_removed: number | null; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.credentials.shared', payload: { ...userToPayload(userSharedCredentialsData.user), @@ -876,7 +877,7 @@ export class InternalHooks { failure_reason?: string; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.package.installed', payload: { ...userToPayload(installationData.user), @@ -914,7 +915,7 @@ export class InternalHooks { package_author_email?: string; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.package.updated', payload: { ...userToPayload(updateData.user), @@ -947,7 +948,7 @@ export class InternalHooks { package_author_email?: string; }): Promise { void Promise.all([ - eventBus.sendAuditEvent({ + this.eventBus.sendAuditEvent({ eventName: 'n8n.audit.package.deleted', payload: { ...userToPayload(deleteData.user), diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 7bad362226..7f1b06b6b8 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -67,7 +67,7 @@ import { setupAuthMiddlewares } from './middlewares'; import { isLdapEnabled } from './Ldap/helpers'; import { AbstractServer } from './AbstractServer'; import { PostHogClient } from './posthog'; -import { eventBus } from './eventbus'; +import { MessageEventBus } from '@/eventbus'; import { InternalHooks } from './InternalHooks'; import { License } from './License'; import { SamlController } from './sso/saml/routes/saml.controller.ee'; @@ -416,10 +416,8 @@ export class Server extends AbstractServer { // ---------------------------------------- // EventBus Setup // ---------------------------------------- - - if (!eventBus.isInitialized) { - await eventBus.initialize(); - } + const eventBus = Container.get(MessageEventBus); + await eventBus.initialize(); if (this.endpointPresetCredentials !== '') { // POST endpoint to set preset credentials diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 00d15cbad9..5c25dbff3f 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -28,6 +28,8 @@ import { fork } from 'child_process'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; import { ExecutionRepository } from '@db/repositories/execution.repository'; +import { MessageEventBus } from '@/eventbus'; +import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; import { ExternalHooks } from '@/ExternalHooks'; import type { IExecutionResponse, @@ -125,18 +127,13 @@ export class WorkflowRunner { // does contain those messages. try { // Search for messages for this executionId in event logs - const { eventBus } = await import('./eventbus'); + const eventBus = Container.get(MessageEventBus); const eventLogMessages = await eventBus.getEventsByExecutionId(executionId); // Attempt to recover more better runData from these messages (but don't update the execution db entry yet) if (eventLogMessages.length > 0) { - const { recoverExecutionDataFromEventLogMessages } = await import( - './eventbus/MessageEventBus/recoverEvents' - ); - const eventLogExecutionData = await recoverExecutionDataFromEventLogMessages( - executionId, - eventLogMessages, - false, - ); + const eventLogExecutionData = await Container.get( + ExecutionDataRecoveryService, + ).recoverExecutionData(executionId, eventLogMessages, false); if (eventLogExecutionData) { fullRunData.data.resultData.runData = eventLogExecutionData.resultData.runData; fullRunData.status = 'crashed'; diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index fdfeee92b9..a1ddf03ec7 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -16,7 +16,7 @@ import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { Server } from '@/Server'; import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants'; -import { eventBus } from '@/eventbus'; +import { MessageEventBus } from '@/eventbus'; import { InternalHooks } from '@/InternalHooks'; import { License } from '@/License'; import { OrchestrationService } from '@/services/orchestration.service'; @@ -127,7 +127,7 @@ export class Start extends BaseCommand { } // Finally shut down Event Bus - await eventBus.close(); + await Container.get(MessageEventBus).close(); } catch (error) { await this.exitWithCrash('There was an error shutting down n8n.', error); } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 99657fc9ae..b8a47b86e5 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -29,7 +29,7 @@ import { OwnershipService } from '@/services/ownership.service'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; -import { eventBus } from '@/eventbus'; +import { MessageEventBus } from '@/eventbus'; import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; @@ -307,7 +307,7 @@ export class Worker extends BaseCommand { } async initEventBus() { - await eventBus.initialize({ + await Container.get(MessageEventBus).initialize({ workerId: this.queueModeId, }); } diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index 8e93d77726..2d0b50d6c4 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -4,7 +4,7 @@ import config from '@/config'; import { SettingsRepository } from '@db/repositories/settings.repository'; import { UserRepository } from '@db/repositories/user.repository'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import { eventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { MessageEventBus } from '@/eventbus'; import { License } from '@/License'; import { LICENSE_FEATURES, inE2ETests } from '@/constants'; import { NoAuthRequired, Patch, Post, RestController } from '@/decorators'; @@ -91,6 +91,7 @@ export class E2EController { private readonly cacheService: CacheService, private readonly push: Push, private readonly passwordUtility: PasswordUtility, + private readonly eventBus: MessageEventBus, ) { license.isFeatureEnabled = (feature: BooleanLicenseFeature) => this.enabledFeatures[feature] ?? false; @@ -136,8 +137,8 @@ export class E2EController { } private async resetLogStreaming() { - for (const id in eventBus.destinations) { - await eventBus.removeDestination(id, false); + for (const id in this.eventBus.destinations) { + await this.eventBus.removeDestination(id, false); } } diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 5ceef85f6e..e6784327c4 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,7 +1,18 @@ -import { jsonParse } from 'n8n-workflow'; -import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; +import { Service } from 'typedi'; import type { DeleteResult } from 'typeorm'; import { In } from 'typeorm'; +import EventEmitter from 'events'; +import uniqby from 'lodash/uniqBy'; +import { jsonParse } from 'n8n-workflow'; +import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; + +import config from '@/config'; +import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; +import { ExecutionRepository } from '@db/repositories/execution.repository'; +import { WorkflowRepository } from '@db/repositories/workflow.repository'; +import { OrchestrationService } from '@/services/orchestration.service'; +import { Logger } from '@/Logger'; + import type { EventMessageTypes, EventNamesTypes, @@ -9,10 +20,7 @@ import type { } from '../EventMessageClasses/'; import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee'; import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter'; -import EventEmitter from 'events'; -import config from '@/config'; import { messageEventBusDestinationFromDb } from '../MessageEventBusDestination/MessageEventBusDestinationFromDb'; -import uniqby from 'lodash/uniqBy'; import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm'; import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessageAudit'; import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit'; @@ -25,16 +33,10 @@ import { EventMessageGeneric, eventMessageGenericDestinationTestEvent, } from '../EventMessageClasses/EventMessageGeneric'; -import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; -import { Container, Service } from 'typedi'; -import { ExecutionRepository } from '@db/repositories/execution.repository'; -import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { OrchestrationService } from '@/services/orchestration.service'; -import { Logger } from '@/Logger'; -import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; +import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -50,7 +52,7 @@ export interface MessageEventBusInitializeOptions { @Service() export class MessageEventBus extends EventEmitter { - isInitialized: boolean; + private isInitialized = false; logWriter: MessageEventBusLogWriter; @@ -60,9 +62,15 @@ export class MessageEventBus extends EventEmitter { private pushIntervalTimer: NodeJS.Timer; - constructor(private readonly logger: Logger) { + constructor( + private readonly logger: Logger, + private readonly executionRepository: ExecutionRepository, + private readonly eventDestinationsRepository: EventDestinationsRepository, + private readonly workflowRepository: WorkflowRepository, + private readonly orchestrationService: OrchestrationService, + private readonly recoveryService: ExecutionDataRecoveryService, + ) { super(); - this.isInitialized = false; } /** @@ -80,7 +88,7 @@ export class MessageEventBus extends EventEmitter { this.logger.debug('Initializing event bus...'); - const savedEventDestinations = await Container.get(EventDestinationsRepository).find({}); + const savedEventDestinations = await this.eventDestinationsRepository.find({}); if (savedEventDestinations.length > 0) { for (const destinationData of savedEventDestinations) { try { @@ -132,7 +140,7 @@ export class MessageEventBus extends EventEmitter { // crashing, so we can't just mark them as crashed if (config.get('executions.mode') !== 'queue') { const dbUnfinishedExecutionIds = ( - await Container.get(ExecutionRepository).find({ + await this.executionRepository.find({ where: { status: In(['running', 'new', 'unknown']), }, @@ -147,7 +155,7 @@ export class MessageEventBus extends EventEmitter { if (unfinishedExecutionIds.length > 0) { this.logger.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); this.logger.info('This could be due to a crash of an active workflow or a restart of n8n.'); - const activeWorkflows = await Container.get(WorkflowRepository).find({ + const activeWorkflows = await this.workflowRepository.find({ where: { active: true }, select: ['id', 'name'], }); @@ -159,7 +167,7 @@ export class MessageEventBus extends EventEmitter { } const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { - await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds); + await this.executionRepository.markAsCrashed(unfinishedExecutionIds); // if we end up here, it means that the previous recovery process did not finish // a possible reason would be that recreating the workflow data itself caused e.g an OOM error // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed @@ -174,9 +182,9 @@ export class MessageEventBus extends EventEmitter { this.logger.debug( `No event messages found, marking execution ${executionId} as 'crashed'`, ); - await Container.get(ExecutionRepository).markAsCrashed([executionId]); + await this.executionRepository.markAsCrashed([executionId]); } else { - await recoverExecutionDataFromEventLogMessages( + await this.recoveryService.recoverExecutionData( executionId, unsentAndUnfinished.unfinishedExecutions[executionId], true, @@ -207,7 +215,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await Container.get(OrchestrationService).publish('restartEventBus'); + await this.orchestrationService.publish('restartEventBus'); } return destination; } @@ -233,7 +241,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await Container.get(OrchestrationService).publish('restartEventBus'); + await this.orchestrationService.publish('restartEventBus'); } return result; } @@ -243,7 +251,7 @@ export class MessageEventBus extends EventEmitter { if (eventData) { const eventMessage = getEventMessageObjectByType(eventData); if (eventMessage) { - await Container.get(MessageEventBus).send(eventMessage); + await this.send(eventMessage); } } return eventData; @@ -370,7 +378,7 @@ export class MessageEventBus extends EventEmitter { .slice(-amount); for (const execution of filteredExecutionIds) { - const data = await recoverExecutionDataFromEventLogMessages( + const data = await this.recoveryService.recoverExecutionData( execution.executionId, queryResult, false, @@ -450,5 +458,3 @@ export class MessageEventBus extends EventEmitter { await this.send(new EventMessageNode(options)); } } - -export const eventBus = Container.get(MessageEventBus); diff --git a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts deleted file mode 100644 index 6aa3fe616c..0000000000 --- a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts +++ /dev/null @@ -1,205 +0,0 @@ -import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; -import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow'; -import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses'; -import type { DateTime } from 'luxon'; -import { Push } from '@/push'; -import { Container } from 'typedi'; -import { InternalHooks } from '@/InternalHooks'; -import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; -import { ExecutionRepository } from '@db/repositories/execution.repository'; - -export async function recoverExecutionDataFromEventLogMessages( - executionId: string, - messages: EventMessageTypes[], - applyToDb: boolean, -): Promise { - const executionEntry = await Container.get(ExecutionRepository).findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); - - if (executionEntry && messages) { - let executionData = executionEntry.data; - let workflowError: WorkflowOperationError | undefined; - if (!executionData) { - executionData = { resultData: { runData: {} } }; - } - let nodeNames: string[] = []; - if ( - executionData?.resultData?.runData && - Object.keys(executionData.resultData.runData).length > 0 - ) { - } else { - if (!executionData.resultData) { - executionData.resultData = { - runData: {}, - }; - } else { - if (!executionData.resultData.runData) { - executionData.resultData.runData = {}; - } - } - } - nodeNames = executionEntry.workflowData.nodes.map((n) => n.name); - - let lastNodeRunTimestamp: DateTime | undefined = undefined; - - for (const nodeName of nodeNames) { - const nodeByName = executionEntry?.workflowData.nodes.find((n) => n.name === nodeName); - - if (!nodeByName) continue; - - const nodeStartedMessage = messages.find( - (message) => - message.eventName === 'n8n.node.started' && message.payload.nodeName === nodeName, - ); - const nodeFinishedMessage = messages.find( - (message) => - message.eventName === 'n8n.node.finished' && message.payload.nodeName === nodeName, - ); - - const executionTime = - nodeStartedMessage && nodeFinishedMessage - ? nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis() - : 0; - - let taskData: ITaskData; - if (executionData.resultData.runData[nodeName]?.length > 0) { - taskData = executionData.resultData.runData[nodeName][0]; - } else { - taskData = { - startTime: nodeStartedMessage ? nodeStartedMessage.ts.toUnixInteger() : 0, - executionTime, - source: [null], - executionStatus: 'unknown', - }; - } - - if (nodeStartedMessage && !nodeFinishedMessage) { - const nodeError = new NodeOperationError( - nodeByName, - 'Node crashed, possible out-of-memory issue', - { - message: 'Execution stopped at this node', - description: - "n8n may have run out of memory while executing it. More context and tips on how to avoid this in the docs", - }, - ); - workflowError = new WorkflowOperationError( - 'Workflow did not finish, possible out-of-memory issue', - ); - taskData.error = nodeError; - taskData.executionStatus = 'crashed'; - executionData.resultData.lastNodeExecuted = nodeName; - if (nodeStartedMessage) lastNodeRunTimestamp = nodeStartedMessage.ts; - } else if (nodeStartedMessage && nodeFinishedMessage) { - taskData.executionStatus = 'success'; - if (taskData.data === undefined) { - taskData.data = { - main: [ - [ - { - json: { - isArtificialRecoveredEventItem: true, - }, - pairedItem: undefined, - }, - ], - ], - }; - } - } - - if (!executionData.resultData.runData[nodeName]) { - executionData.resultData.runData[nodeName] = [taskData]; - } - } - - if (!lastNodeRunTimestamp) { - const workflowEndedMessage = messages.find((message) => - ( - [ - 'n8n.workflow.success', - 'n8n.workflow.crashed', - 'n8n.workflow.failed', - ] as EventNamesTypes[] - ).includes(message.eventName), - ); - if (workflowEndedMessage) { - lastNodeRunTimestamp = workflowEndedMessage.ts; - } else { - if (!workflowError) { - workflowError = new WorkflowOperationError( - 'Workflow did not finish, possible out-of-memory issue', - ); - } - const workflowStartedMessage = messages.find( - (message) => message.eventName === 'n8n.workflow.started', - ); - if (workflowStartedMessage) { - lastNodeRunTimestamp = workflowStartedMessage.ts; - } - } - } - - if (!executionData.resultData.error && workflowError) { - executionData.resultData.error = workflowError; - } - - if (applyToDb) { - const newStatus = executionEntry.status === 'failed' ? 'failed' : 'crashed'; - await Container.get(ExecutionRepository).updateExistingExecution(executionId, { - data: executionData, - status: newStatus, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - }); - await Container.get(InternalHooks).onWorkflowPostExecute( - executionId, - executionEntry.workflowData, - { - data: executionData, - finished: false, - mode: executionEntry.mode, - waitTill: executionEntry.waitTill ?? undefined, - startedAt: executionEntry.startedAt, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - status: newStatus, - }, - ); - const iRunData: IRun = { - data: executionData, - finished: false, - mode: executionEntry.mode, - waitTill: executionEntry.waitTill ?? undefined, - startedAt: executionEntry.startedAt, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - status: newStatus, - }; - const workflowHooks = getWorkflowHooksMain( - { - userId: '', - workflowData: executionEntry.workflowData, - executionMode: executionEntry.mode, - executionData, - runData: executionData.resultData.runData, - retryOf: executionEntry.retryOf, - }, - executionId, - ); - - // execute workflowExecuteAfter hook to trigger error workflow - await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]); - - const push = Container.get(Push); - // wait for UI to be back up and send the execution data - push.once('editorUiConnected', function handleUiBackUp() { - // add a small timeout to make sure the UI is back up - setTimeout(() => { - push.broadcast('executionRecovered', { executionId }); - }, 1000); - }); - } - return executionData; - } - return; -} diff --git a/packages/cli/src/eventbus/eventBus.controller.ee.ts b/packages/cli/src/eventbus/eventBus.controller.ee.ts index f88628bfec..216496092e 100644 --- a/packages/cli/src/eventbus/eventBus.controller.ee.ts +++ b/packages/cli/src/eventbus/eventBus.controller.ee.ts @@ -1,5 +1,15 @@ import express from 'express'; -import { eventBus } from './MessageEventBus/MessageEventBus'; +import type { + MessageEventBusDestinationWebhookOptions, + MessageEventBusDestinationOptions, +} from 'n8n-workflow'; +import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; + +import { RestController, Get, Post, Delete, Authorized, RequireGlobalScope } from '@/decorators'; +import { AuthenticatedRequest } from '@/requests'; +import { BadRequestError } from '@/errors/response-errors/bad-request.error'; + +import { MessageEventBus } from './MessageEventBus/MessageEventBus'; import { isMessageEventBusDestinationSentryOptions, MessageEventBusDestinationSentry, @@ -9,16 +19,8 @@ import { MessageEventBusDestinationSyslog, } from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; -import type { - MessageEventBusDestinationWebhookOptions, - MessageEventBusDestinationOptions, -} from 'n8n-workflow'; -import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; -import { RestController, Get, Post, Delete, Authorized, RequireGlobalScope } from '@/decorators'; import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee'; -import { AuthenticatedRequest } from '@/requests'; import { logStreamingLicensedMiddleware } from './middleware/logStreamingEnabled.middleware.ee'; -import { BadRequestError } from '@/errors/response-errors/bad-request.error'; // ---------------------------------------- // TypeGuards @@ -53,6 +55,8 @@ const isMessageEventBusDestinationOptions = ( @Authorized() @RestController('/eventbus') export class EventBusControllerEE { + constructor(private readonly eventBus: MessageEventBus) {} + // ---------------------------------------- // Destinations // ---------------------------------------- @@ -61,9 +65,9 @@ export class EventBusControllerEE { @RequireGlobalScope('eventBusDestination:list') async getDestination(req: express.Request): Promise { if (isWithIdString(req.query)) { - return await eventBus.findDestination(req.query.id); + return await this.eventBus.findDestination(req.query.id); } else { - return await eventBus.findDestination(); + return await this.eventBus.findDestination(); } } @@ -75,22 +79,22 @@ export class EventBusControllerEE { switch (req.body.__type) { case MessageEventBusDestinationTypeNames.sentry: if (isMessageEventBusDestinationSentryOptions(req.body)) { - result = await eventBus.addDestination( - new MessageEventBusDestinationSentry(eventBus, req.body), + result = await this.eventBus.addDestination( + new MessageEventBusDestinationSentry(this.eventBus, req.body), ); } break; case MessageEventBusDestinationTypeNames.webhook: if (isMessageEventBusDestinationWebhookOptions(req.body)) { - result = await eventBus.addDestination( - new MessageEventBusDestinationWebhook(eventBus, req.body), + result = await this.eventBus.addDestination( + new MessageEventBusDestinationWebhook(this.eventBus, req.body), ); } break; case MessageEventBusDestinationTypeNames.syslog: if (isMessageEventBusDestinationSyslogOptions(req.body)) { - result = await eventBus.addDestination( - new MessageEventBusDestinationSyslog(eventBus, req.body), + result = await this.eventBus.addDestination( + new MessageEventBusDestinationSyslog(this.eventBus, req.body), ); } break; @@ -115,7 +119,7 @@ export class EventBusControllerEE { @RequireGlobalScope('eventBusDestination:test') async sendTestMessage(req: express.Request): Promise { if (isWithIdString(req.query)) { - return await eventBus.testDestination(req.query.id); + return await this.eventBus.testDestination(req.query.id); } return false; } @@ -124,7 +128,7 @@ export class EventBusControllerEE { @RequireGlobalScope('eventBusDestination:delete') async deleteDestination(req: AuthenticatedRequest) { if (isWithIdString(req.query)) { - return await eventBus.removeDestination(req.query.id); + return await this.eventBus.removeDestination(req.query.id); } else { throw new BadRequestError('Query is missing id'); } diff --git a/packages/cli/src/eventbus/eventBus.controller.ts b/packages/cli/src/eventbus/eventBus.controller.ts index f10b4e5674..a76c90ddc8 100644 --- a/packages/cli/src/eventbus/eventBus.controller.ts +++ b/packages/cli/src/eventbus/eventBus.controller.ts @@ -1,21 +1,23 @@ import express from 'express'; +import type { IRunExecutionData } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; + +import { RestController, Get, Post, Authorized, RequireGlobalScope } from '@/decorators'; +import { BadRequestError } from '@/errors/response-errors/bad-request.error'; + import { isEventMessageOptions } from './EventMessageClasses/AbstractEventMessage'; import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric'; import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMessageWorkflow'; import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow'; import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus'; -import { eventBus } from './MessageEventBus/MessageEventBus'; +import { MessageEventBus } from './MessageEventBus/MessageEventBus'; import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses'; import { eventNamesAll } from './EventMessageClasses'; import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit'; import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit'; -import type { IRunExecutionData } from 'n8n-workflow'; -import { EventMessageTypeNames } from 'n8n-workflow'; import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode'; import { EventMessageNode } from './EventMessageClasses/EventMessageNode'; -import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents'; -import { RestController, Get, Post, Authorized, RequireGlobalScope } from '@/decorators'; -import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { ExecutionDataRecoveryService } from './executionDataRecovery.service'; // ---------------------------------------- // TypeGuards @@ -34,6 +36,11 @@ const isWithQueryString = (candidate: unknown): candidate is { query: string } = @Authorized() @RestController('/eventbus') export class EventBusController { + constructor( + private readonly eventBus: MessageEventBus, + private readonly recoveryService: ExecutionDataRecoveryService, + ) {} + // ---------------------------------------- // Events // ---------------------------------------- @@ -45,17 +52,17 @@ export class EventBusController { if (isWithQueryString(req.query)) { switch (req.query.query as EventMessageReturnMode) { case 'sent': - return await eventBus.getEventsSent(); + return await this.eventBus.getEventsSent(); case 'unsent': - return await eventBus.getEventsUnsent(); + return await this.eventBus.getEventsUnsent(); case 'unfinished': - return await eventBus.getUnfinishedExecutions(); + return await this.eventBus.getUnfinishedExecutions(); case 'all': default: - return await eventBus.getEventsAll(); + return await this.eventBus.getEventsAll(); } } else { - return await eventBus.getEventsAll(); + return await this.eventBus.getEventsAll(); } } @@ -63,7 +70,7 @@ export class EventBusController { @RequireGlobalScope('eventBusEvent:list') async getFailedEvents(req: express.Request): Promise { const amount = parseInt(req.query?.amount as string) ?? 5; - return await eventBus.getEventsFailed(amount); + return await this.eventBus.getEventsFailed(amount); } @Get('/execution/:id') @@ -74,7 +81,7 @@ export class EventBusController { if (req.query?.logHistory) { logHistory = parseInt(req.query.logHistory as string, 10); } - return await eventBus.getEventsByExecutionId(req.params.id, logHistory); + return await this.eventBus.getEventsByExecutionId(req.params.id, logHistory); } return; } @@ -86,9 +93,9 @@ export class EventBusController { if (req.params?.id) { const logHistory = parseInt(req.query.logHistory as string, 10) || undefined; const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true; - const messages = await eventBus.getEventsByExecutionId(id, logHistory); + const messages = await this.eventBus.getEventsByExecutionId(id, logHistory); if (messages.length > 0) { - return await recoverExecutionDataFromEventLogMessages(id, messages, applyToDb); + return await this.recoveryService.recoverExecutionData(id, messages, applyToDb); } } return; @@ -113,7 +120,7 @@ export class EventBusController { default: msg = new EventMessageGeneric(req.body); } - await eventBus.send(msg); + await this.eventBus.send(msg); } else { throw new BadRequestError( 'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}', diff --git a/packages/cli/src/eventbus/executionDataRecovery.service.ts b/packages/cli/src/eventbus/executionDataRecovery.service.ts new file mode 100644 index 0000000000..80251bd92f --- /dev/null +++ b/packages/cli/src/eventbus/executionDataRecovery.service.ts @@ -0,0 +1,212 @@ +import { Container, Service } from 'typedi'; +import type { DateTime } from 'luxon'; +import { Push } from '@/push'; +import { InternalHooks } from '@/InternalHooks'; +import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; +import { NodeOperationError, WorkflowOperationError, sleep } from 'n8n-workflow'; + +import { ExecutionRepository } from '@db/repositories/execution.repository'; +import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; +import type { EventMessageTypes, EventNamesTypes } from './EventMessageClasses'; + +@Service() +export class ExecutionDataRecoveryService { + constructor( + private readonly push: Push, + private readonly executionRepository: ExecutionRepository, + ) {} + + async recoverExecutionData( + executionId: string, + messages: EventMessageTypes[], + applyToDb: boolean, + ): Promise { + const executionEntry = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + if (executionEntry && messages) { + let executionData = executionEntry.data; + let workflowError: WorkflowOperationError | undefined; + if (!executionData) { + executionData = { resultData: { runData: {} } }; + } + let nodeNames: string[] = []; + if ( + executionData?.resultData?.runData && + Object.keys(executionData.resultData.runData).length > 0 + ) { + } else { + if (!executionData.resultData) { + executionData.resultData = { + runData: {}, + }; + } else { + if (!executionData.resultData.runData) { + executionData.resultData.runData = {}; + } + } + } + nodeNames = executionEntry.workflowData.nodes.map((n) => n.name); + + let lastNodeRunTimestamp: DateTime | undefined = undefined; + + for (const nodeName of nodeNames) { + const nodeByName = executionEntry?.workflowData.nodes.find((n) => n.name === nodeName); + + if (!nodeByName) continue; + + const nodeStartedMessage = messages.find( + (message) => + message.eventName === 'n8n.node.started' && message.payload.nodeName === nodeName, + ); + const nodeFinishedMessage = messages.find( + (message) => + message.eventName === 'n8n.node.finished' && message.payload.nodeName === nodeName, + ); + + const executionTime = + nodeStartedMessage && nodeFinishedMessage + ? nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis() + : 0; + + let taskData: ITaskData; + if (executionData.resultData.runData[nodeName]?.length > 0) { + taskData = executionData.resultData.runData[nodeName][0]; + } else { + taskData = { + startTime: nodeStartedMessage ? nodeStartedMessage.ts.toUnixInteger() : 0, + executionTime, + source: [null], + executionStatus: 'unknown', + }; + } + + if (nodeStartedMessage && !nodeFinishedMessage) { + const nodeError = new NodeOperationError( + nodeByName, + 'Node crashed, possible out-of-memory issue', + { + message: 'Execution stopped at this node', + description: + "n8n may have run out of memory while executing it. More context and tips on how to avoid this in the docs", + }, + ); + workflowError = new WorkflowOperationError( + 'Workflow did not finish, possible out-of-memory issue', + ); + taskData.error = nodeError; + taskData.executionStatus = 'crashed'; + executionData.resultData.lastNodeExecuted = nodeName; + if (nodeStartedMessage) lastNodeRunTimestamp = nodeStartedMessage.ts; + } else if (nodeStartedMessage && nodeFinishedMessage) { + taskData.executionStatus = 'success'; + if (taskData.data === undefined) { + taskData.data = { + main: [ + [ + { + json: { + isArtificialRecoveredEventItem: true, + }, + pairedItem: undefined, + }, + ], + ], + }; + } + } + + if (!executionData.resultData.runData[nodeName]) { + executionData.resultData.runData[nodeName] = [taskData]; + } + } + + if (!lastNodeRunTimestamp) { + const workflowEndedMessage = messages.find((message) => + ( + [ + 'n8n.workflow.success', + 'n8n.workflow.crashed', + 'n8n.workflow.failed', + ] as EventNamesTypes[] + ).includes(message.eventName), + ); + if (workflowEndedMessage) { + lastNodeRunTimestamp = workflowEndedMessage.ts; + } else { + if (!workflowError) { + workflowError = new WorkflowOperationError( + 'Workflow did not finish, possible out-of-memory issue', + ); + } + const workflowStartedMessage = messages.find( + (message) => message.eventName === 'n8n.workflow.started', + ); + if (workflowStartedMessage) { + lastNodeRunTimestamp = workflowStartedMessage.ts; + } + } + } + + if (!executionData.resultData.error && workflowError) { + executionData.resultData.error = workflowError; + } + + if (applyToDb) { + const newStatus = executionEntry.status === 'failed' ? 'failed' : 'crashed'; + await this.executionRepository.updateExistingExecution(executionId, { + data: executionData, + status: newStatus, + stoppedAt: lastNodeRunTimestamp?.toJSDate(), + }); + await Container.get(InternalHooks).onWorkflowPostExecute( + executionId, + executionEntry.workflowData, + { + data: executionData, + finished: false, + mode: executionEntry.mode, + waitTill: executionEntry.waitTill ?? undefined, + startedAt: executionEntry.startedAt, + stoppedAt: lastNodeRunTimestamp?.toJSDate(), + status: newStatus, + }, + ); + const iRunData: IRun = { + data: executionData, + finished: false, + mode: executionEntry.mode, + waitTill: executionEntry.waitTill ?? undefined, + startedAt: executionEntry.startedAt, + stoppedAt: lastNodeRunTimestamp?.toJSDate(), + status: newStatus, + }; + const workflowHooks = getWorkflowHooksMain( + { + userId: '', + workflowData: executionEntry.workflowData, + executionMode: executionEntry.mode, + executionData, + runData: executionData.resultData.runData, + retryOf: executionEntry.retryOf, + }, + executionId, + ); + + // execute workflowExecuteAfter hook to trigger error workflow + await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]); + + // wait for UI to be back up and send the execution data + this.push.once('editorUiConnected', async () => { + // add a small timeout to make sure the UI is back up + await sleep(1000); + this.push.broadcast('executionRecovered', { executionId }); + }); + } + return executionData; + } + return; + } +} diff --git a/packages/cli/src/eventbus/index.ts b/packages/cli/src/eventbus/index.ts index 1b3b48d7af..7118b57bd2 100644 --- a/packages/cli/src/eventbus/index.ts +++ b/packages/cli/src/eventbus/index.ts @@ -1 +1,4 @@ -export { eventBus } from './MessageEventBus/MessageEventBus'; +export { MessageEventBus } from './MessageEventBus/MessageEventBus'; +export { EventMessageTypes } from './EventMessageClasses'; +export { EventPayloadWorkflow } from './EventMessageClasses/EventMessageWorkflow'; +export { METRICS_EVENT_NAME, getLabelsForEvent } from './MessageEventBusDestination/Helpers.ee'; diff --git a/packages/cli/src/services/metrics.service.ts b/packages/cli/src/services/metrics.service.ts index 3d2ba25e6a..8c185f39bf 100644 --- a/packages/cli/src/services/metrics.service.ts +++ b/packages/cli/src/services/metrics.service.ts @@ -8,12 +8,12 @@ import { Service } from 'typedi'; import EventEmitter from 'events'; import { CacheService } from '@/services/cache/cache.service'; -import type { EventMessageTypes } from '@/eventbus/EventMessageClasses'; import { + MessageEventBus, METRICS_EVENT_NAME, getLabelsForEvent, -} from '@/eventbus/MessageEventBusDestination/Helpers.ee'; -import { eventBus } from '@/eventbus'; + type EventMessageTypes, +} from '@/eventbus'; import { Logger } from '@/Logger'; @Service() @@ -21,6 +21,7 @@ export class MetricsService extends EventEmitter { constructor( private readonly logger: Logger, private readonly cacheService: CacheService, + private readonly eventBus: MessageEventBus, ) { super(); } @@ -151,7 +152,7 @@ export class MetricsService extends EventEmitter { if (!config.getEnv('endpoints.metrics.includeMessageEventBusMetrics')) { return; } - eventBus.on(METRICS_EVENT_NAME, (event: EventMessageTypes) => { + this.eventBus.on(METRICS_EVENT_NAME, (event: EventMessageTypes) => { const counter = this.getCounterForEvent(event); if (!counter) return; counter.inc(1); diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index 25e761914f..dbded51289 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -1,9 +1,9 @@ +import { Container } from 'typedi'; import config from '@/config'; import axios from 'axios'; import syslog from 'syslog-client'; import { v4 as uuid } from 'uuid'; import type { SuperAgentTest } from 'supertest'; -import type { User } from '@db/entities/User'; import type { MessageEventBusDestinationSentryOptions, MessageEventBusDestinationSyslogOptions, @@ -14,7 +14,9 @@ import { defaultMessageEventBusDestinationSyslogOptions, defaultMessageEventBusDestinationWebhookOptions, } from 'n8n-workflow'; -import { eventBus } from '@/eventbus'; + +import type { User } from '@db/entities/User'; +import { MessageEventBus } from '@/eventbus'; import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import type { MessageEventBusDestinationSyslog } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; import type { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; @@ -23,9 +25,11 @@ import { EventMessageAudit } from '@/eventbus/EventMessageClasses/EventMessageAu import type { EventNamesTypes } from '@/eventbus/EventMessageClasses'; import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode'; +import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; import * as utils from './shared/utils'; import { createUser } from './shared/db/users'; +import { mockInstance } from '../shared/mocking'; jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); jest.mock('axios'); @@ -64,6 +68,8 @@ const testSentryDestination: MessageEventBusDestinationSentryOptions = { subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], }; +let eventBus: MessageEventBus; + async function confirmIdInAll(id: string) { const sent = await eventBus.getEventsAll(); expect(sent.length).toBeGreaterThan(0); @@ -76,6 +82,7 @@ async function confirmIdSent(id: string) { expect(sent.find((msg) => msg.id === id)).toBeTruthy(); } +mockInstance(ExecutionDataRecoveryService); const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], enabledFeatures: ['feat:logStreaming'], @@ -90,12 +97,13 @@ beforeAll(async () => { config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.keepLogCount', 1); - await eventBus.initialize({}); + eventBus = Container.get(MessageEventBus); + await eventBus.initialize(); }); afterAll(async () => { jest.mock('@/eventbus/MessageEventBus/MessageEventBus'); - await eventBus.close(); + await eventBus?.close(); }); test('should have a running logwriter process', () => { diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 9f6581d049..3a441c7885 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -1,7 +1,12 @@ import type { SuperAgentTest } from 'supertest'; -import * as utils from './shared/utils/'; + import type { User } from '@db/entities/User'; +import { MessageEventBus } from '@/eventbus'; +import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; + +import * as utils from './shared/utils/'; import { createUser } from './shared/db/users'; +import { mockInstance } from '../shared/mocking'; /** * NOTE: due to issues with mocking the MessageEventBus in multiple tests running in parallel, @@ -12,6 +17,8 @@ import { createUser } from './shared/db/users'; let owner: User; let authOwnerAgent: SuperAgentTest; +mockInstance(MessageEventBus); +mockInstance(ExecutionDataRecoveryService); const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], enabledFeatures: [], // do not enable logstreaming diff --git a/packages/cli/test/integration/metrics.test.ts b/packages/cli/test/integration/metrics.test.ts index ef6d2db665..394ec026c4 100644 --- a/packages/cli/test/integration/metrics.test.ts +++ b/packages/cli/test/integration/metrics.test.ts @@ -1,11 +1,16 @@ -import { setupTestServer } from './shared/utils'; -import config from '@/config'; -import request from 'supertest'; -import Container from 'typedi'; -import { MetricsService } from '@/services/metrics.service'; -import { N8N_VERSION } from '@/constants'; +import { Container } from 'typedi'; import { parse as semverParse } from 'semver'; +import request from 'supertest'; +import config from '@/config'; +import { N8N_VERSION } from '@/constants'; +import { MetricsService } from '@/services/metrics.service'; +import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; + +import { setupTestServer } from './shared/utils'; +import { mockInstance } from '../shared/mocking'; + +mockInstance(ExecutionDataRecoveryService); jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); config.set('endpoints.metrics.enable', true); config.set('endpoints.metrics.includeDefaultMetrics', false); diff --git a/packages/cli/test/integration/workflows/workflow.service.test.ts b/packages/cli/test/integration/workflows/workflow.service.test.ts index 056c11ca29..996b3d0d86 100644 --- a/packages/cli/test/integration/workflows/workflow.service.test.ts +++ b/packages/cli/test/integration/workflows/workflow.service.test.ts @@ -3,6 +3,7 @@ import { mock } from 'jest-mock-extended'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; +import { MessageEventBus } from '@/eventbus'; import { Telemetry } from '@/telemetry'; import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowService } from '@/workflows/workflow.service'; @@ -13,16 +14,14 @@ import { createOwner } from '../shared/db/users'; import { createWorkflow } from '../shared/db/workflows'; let workflowService: WorkflowService; -let activeWorkflowRunner: ActiveWorkflowRunner; -let orchestrationService: OrchestrationService; +const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); +const orchestrationService = mockInstance(OrchestrationService); +mockInstance(MessageEventBus); +mockInstance(Telemetry); beforeAll(async () => { await testDb.init(); - activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); - orchestrationService = mockInstance(OrchestrationService); - mockInstance(Telemetry); - workflowService = new WorkflowService( mock(), mock(), diff --git a/packages/cli/test/unit/InternalHooks.test.ts b/packages/cli/test/unit/InternalHooks.test.ts index e91759dadd..4bd10033ac 100644 --- a/packages/cli/test/unit/InternalHooks.test.ts +++ b/packages/cli/test/unit/InternalHooks.test.ts @@ -12,7 +12,7 @@ let telemetry: Telemetry; describe('InternalHooks', () => { beforeAll(() => { telemetry = mockInstance(Telemetry); - internalHooks = new InternalHooks(telemetry, mock(), mock(), mock(), mock()); + internalHooks = new InternalHooks(telemetry, mock(), mock(), mock(), mock(), mock()); }); it('Should be defined', () => { diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 6d6bd16822..2dfd1519a6 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -2,7 +2,7 @@ import Container from 'typedi'; import config from '@/config'; import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; -import { eventBus } from '@/eventbus'; +import { MessageEventBus } from '@/eventbus'; import { RedisService } from '@/services/redis.service'; import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handleWorkerResponseMessageMain'; import { handleCommandMessageMain } from '@/services/orchestration/main/handleCommandMessageMain'; @@ -37,9 +37,11 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { describe('Orchestration Service', () => { const logger = mockInstance(Logger); mockInstance(Push); + mockInstance(RedisService); + mockInstance(ExternalSecretsManager); + const eventBus = mockInstance(MessageEventBus); + beforeAll(async () => { - mockInstance(RedisService); - mockInstance(ExternalSecretsManager); jest.mock('ioredis', () => { const Redis = require('ioredis-mock'); if (typeof Redis === 'object') { @@ -110,8 +112,7 @@ describe('Orchestration Service', () => { expect(logger.error).toHaveBeenCalled(); }); - test('should reject command messages from iteslf', async () => { - jest.spyOn(eventBus, 'restart'); + test('should reject command messages from itself', async () => { const response = await handleCommandMessageMain( JSON.stringify({ ...workerRestartEventbusResponse, senderId: queueModeId }), ); @@ -119,7 +120,6 @@ describe('Orchestration Service', () => { expect(response!.command).toEqual('restartEventBus'); expect(response!.senderId).toEqual(queueModeId); expect(eventBus.restart).not.toHaveBeenCalled(); - jest.spyOn(eventBus, 'restart').mockRestore(); }); test('should send command messages', async () => {