refactor(core): Organize all event maps (#10997)

This commit is contained in:
Iván Ovejero 2024-09-30 10:44:03 +02:00 committed by GitHub
parent 3a65bdc1f5
commit 63e6f1fa38
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 161 additions and 159 deletions

View file

@ -13,7 +13,7 @@ import { generateHostInstanceId } from '@/databases/utils/generators';
import * as Db from '@/db';
import { initErrorHandling } from '@/error-reporting';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import { initExpressionEvaluator } from '@/expression-evaluator';
import { ExternalHooks } from '@/external-hooks';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';

View file

@ -6,7 +6,7 @@ import config from '@/config';
import { N8N_VERSION, inTest } from '@/constants';
import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import type { ScalingService } from '@/scaling/scaling.service';

View file

@ -1,5 +1,5 @@
import { RedactableError } from '@/errors/redactable.error';
import type { UserLike } from '@/events/relay-event-map';
import type { UserLike } from '@/events/maps/relay.event-map';
function toRedactable(userLike: UserLike) {
return {

View file

@ -3,8 +3,8 @@ import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import type { RelayEventMap } from '@/events/relay-event-map';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import type { IWorkflowDb } from '@/interfaces';
describe('LogStreamingEventRelay', () => {

View file

@ -9,8 +9,8 @@ import type { ProjectRelationRepository } from '@/databases/repositories/project
import type { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/relay-event-map';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import type { IWorkflowDb } from '@/interfaces';
import type { License } from '@/license';
import type { NodeTypes } from '@/node-types';

View file

@ -2,11 +2,12 @@ import { Service } from 'typedi';
import { TypedEmitter } from '@/typed-emitter';
import type { AiEventMap } from './ai-event-map';
import type { QueueMetricsEventMap } from './queue-metrics-event-map';
import type { RelayEventMap } from './relay-event-map';
import type { AiEventMap } from './maps/ai.event-map';
import type { PubSubEventMap } from './maps/pub-sub.event-map';
import type { QueueMetricsEventMap } from './maps/queue-metrics.event-map';
import type { RelayEventMap } from './maps/relay.event-map';
type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap;
type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap & PubSubEventMap;
@Service()
export class EventService extends TypedEmitter<EventMap> {}

View file

@ -0,0 +1,104 @@
import type { WorkerStatus, PushType } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
export type PubSubEventMap = PubSubCommandMap & PubSubWorkerResponseMap;
export type PubSubCommandMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': never;
'reload-external-secrets-providers': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': never;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};
export type PubSubWorkerResponseMap = {
// #region Lifecycle
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
};

View file

@ -11,7 +11,7 @@ import type { ProjectRole } from '@/databases/entities/project-relation';
import type { GlobalRole } from '@/databases/entities/user';
import type { IWorkflowDb } from '@/interfaces';
import type { AiEventMap } from './ai-event-map';
import type { AiEventMap } from './ai.event-map';
export type UserLike = {
id: string;

View file

@ -1,8 +1,7 @@
import { Service } from 'typedi';
import type { RelayEventMap } from '@/events/relay-event-map';
import { EventService } from './event.service';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
@Service()
export class EventRelay {

View file

@ -3,10 +3,9 @@ import { Service } from 'typedi';
import { Redactable } from '@/decorators/redactable';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventRelay } from '@/events/event-relay';
import type { RelayEventMap } from '@/events/relay-event-map';
import { EventService } from './event.service';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { EventRelay } from '@/events/relays/event-relay';
@Service()
export class LogStreamingEventRelay extends EventRelay {

View file

@ -12,14 +12,14 @@ import { ProjectRelationRepository } from '@/databases/repositories/project-rela
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/relay-event-map';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { determineFinalExecutionStatus } from '@/execution-lifecycle-hooks/shared/shared-hook-functions';
import type { IExecutionTrackProperties } from '@/interfaces';
import { License } from '@/license';
import { NodeTypes } from '@/node-types';
import { EventRelay } from './event-relay';
import { Telemetry } from '../telemetry';
import { Telemetry } from '../../telemetry';
@Service()
export class TelemetryEventRelay extends EventRelay {

View file

@ -1,6 +1,4 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
import type { PubSubCommandMap, PubSubWorkerResponseMap } from '@/events/maps/pub-sub.event-map';
import type { Resolve } from '@/utlity.types';
import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants';
@ -20,92 +18,17 @@ export namespace PubSub {
// commands
// ----------------------------------
export type CommandMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': never;
'reload-external-secrets-providers': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': never;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};
type _ToCommand<CommandKey extends keyof CommandMap> = {
type _ToCommand<CommandKey extends keyof PubSubCommandMap> = {
senderId: string;
targets?: string[];
command: CommandKey;
} & (CommandMap[CommandKey] extends never
} & (PubSubCommandMap[CommandKey] extends never
? { payload?: never } // some commands carry no payload
: { payload: CommandMap[CommandKey] });
: { payload: PubSubCommandMap[CommandKey] });
type ToCommand<CommandKey extends keyof CommandMap> = Resolve<_ToCommand<CommandKey>>;
type ToCommand<CommandKey extends keyof PubSubCommandMap> = Resolve<_ToCommand<CommandKey>>;
namespace Command {
namespace Commands {
export type ReloadLicense = ToCommand<'reload-license'>;
export type RestartEventBus = ToCommand<'restart-event-bus'>;
export type ReloadExternalSecretsProviders = ToCommand<'reload-external-secrets-providers'>;
@ -125,63 +48,39 @@ export namespace PubSub {
/** Command sent via the `n8n.commands` pubsub channel. */
export type Command =
| Command.ReloadLicense
| Command.RestartEventBus
| Command.ReloadExternalSecretsProviders
| Command.CommunityPackageInstall
| Command.CommunityPackageUpdate
| Command.CommunityPackageUninstall
| Command.GetWorkerId
| Command.GetWorkerStatus
| Command.AddWebhooksTriggersAndPollers
| Command.RemoveTriggersAndPollers
| Command.DisplayWorkflowActivation
| Command.DisplayWorkflowDeactivation
| Command.DisplayWorkflowActivationError
| Command.RelayExecutionLifecycleEvent
| Command.ClearTestWebhooks;
| Commands.ReloadLicense
| Commands.RestartEventBus
| Commands.ReloadExternalSecretsProviders
| Commands.CommunityPackageInstall
| Commands.CommunityPackageUpdate
| Commands.CommunityPackageUninstall
| Commands.GetWorkerId
| Commands.GetWorkerStatus
| Commands.AddWebhooksTriggersAndPollers
| Commands.RemoveTriggersAndPollers
| Commands.DisplayWorkflowActivation
| Commands.DisplayWorkflowDeactivation
| Commands.DisplayWorkflowActivationError
| Commands.RelayExecutionLifecycleEvent
| Commands.ClearTestWebhooks;
// ----------------------------------
// worker responses
// ----------------------------------
export type WorkerResponseMap = {
// #region Lifecycle
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
};
type _ToWorkerResponse<WorkerResponseKey extends keyof WorkerResponseMap> = {
type _ToWorkerResponse<WorkerResponseKey extends keyof PubSubWorkerResponseMap> = {
workerId: string;
targets?: string[];
command: WorkerResponseKey;
} & (WorkerResponseMap[WorkerResponseKey] extends never
} & (PubSubWorkerResponseMap[WorkerResponseKey] extends never
? { payload?: never } // some responses carry no payload
: { payload: WorkerResponseMap[WorkerResponseKey] });
: { payload: PubSubWorkerResponseMap[WorkerResponseKey] });
type ToWorkerResponse<WorkerResponseKey extends keyof WorkerResponseMap> = Resolve<
type ToWorkerResponse<WorkerResponseKey extends keyof PubSubWorkerResponseMap> = Resolve<
_ToWorkerResponse<WorkerResponseKey>
>;
namespace WorkerResponse {
namespace WorkerResponses {
export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>;
export type ReloadExternalSecretsProviders =
ToWorkerResponse<'reload-external-secrets-providers'>;
@ -191,8 +90,8 @@ export namespace PubSub {
/** Response sent via the `n8n.worker-response` pubsub channel. */
export type WorkerResponse =
| WorkerResponse.RestartEventBus
| WorkerResponse.ReloadExternalSecretsProviders
| WorkerResponse.GetWorkerId
| WorkerResponse.GetWorkerStatus;
| WorkerResponses.RestartEventBus
| WorkerResponses.ReloadExternalSecretsProviders
| WorkerResponses.GetWorkerId
| WorkerResponses.GetWorkerStatus;
}

View file

@ -21,7 +21,7 @@ import { CredentialsOverwrites } from '@/credentials-overwrites';
import { ControllerRegistry } from '@/decorators';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import type { ICredentialsOverwrite } from '@/interfaces';
import { isLdapEnabled } from '@/ldap/helpers.ee';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';

View file

@ -3,9 +3,9 @@ import type { WorkflowActivateMode } from 'n8n-workflow';
import Container, { Service } from 'typedi';
import config from '@/config';
import type { PubSubCommandMap } from '@/events/maps/pub-sub.event-map';
import { Logger } from '@/logger';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { PubSub } from '@/scaling/pubsub/pubsub.types';
import type { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee';
@ -97,9 +97,9 @@ export class OrchestrationService {
// pubsub
// ----------------------------------
async publish<CommandKey extends keyof PubSub.CommandMap>(
async publish<CommandKey extends keyof PubSubCommandMap>(
commandKey: CommandKey,
payload?: PubSub.CommandMap[CommandKey],
payload?: PubSubCommandMap[CommandKey],
) {
if (!this.sanityCheck()) return;

View file

@ -40,6 +40,7 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map';
import { ExternalHooks } from '@/external-hooks';
import type {
IWorkflowExecuteProcess,
@ -53,7 +54,6 @@ import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import * as WorkflowHelpers from '@/workflow-helpers';
import { WorkflowRepository } from './databases/repositories/workflow.repository';
import type { AiEventMap, AiEventPayload } from './events/ai-event-map';
import { EventService } from './events/event.service';
import { restoreBinaryDataId } from './execution-lifecycle-hooks/restore-binary-data-id';
import { saveExecutionProgress } from './execution-lifecycle-hooks/save-execution-progress';

View file

@ -5,7 +5,7 @@ import { BinaryDataService } from 'n8n-core';
import { Worker } from '@/commands/worker';
import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { ExternalHooks } from '@/external-hooks';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';

View file

@ -4,7 +4,7 @@ import type { Class } from 'n8n-core';
import type { BaseCommand } from '@/commands/base-command';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import { mockInstance } from '@test/mocking';
import * as testDb from '../test-db';