refactor(core): Merge event bus controllers and remove dead code (no-changelog) (#9688)

This commit is contained in:
Iván Ovejero 2024-06-10 17:38:02 +02:00 committed by GitHub
parent 2521daadfc
commit 817167cf4b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 113 additions and 274 deletions

View file

@ -6,7 +6,6 @@ export type Resource =
| 'credential'
| 'externalSecretsProvider'
| 'externalSecret'
| 'eventBusEvent'
| 'eventBusDestination'
| 'ldap'
| 'license'
@ -45,7 +44,6 @@ export type EventBusDestinationScope = ResourceScope<
'eventBusDestination',
DefaultOperations | 'test'
>;
export type EventBusEventScope = ResourceScope<'eventBusEvent', DefaultOperations | 'query'>;
export type LdapScope = ResourceScope<'ldap', 'manage' | 'sync'>;
export type LicenseScope = ResourceScope<'license', 'manage'>;
export type LogStreamingScope = ResourceScope<'logStreaming', 'manage'>;
@ -70,7 +68,6 @@ export type Scope =
| CredentialScope
| ExternalSecretProviderScope
| ExternalSecretScope
| EventBusEventScope
| EventBusDestinationScope
| LdapScope
| LicenseScope

View file

@ -47,7 +47,6 @@ import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import * as ResponseHelper from '@/ResponseHelper';
import { EventBusController } from '@/eventbus/eventBus.controller';
import { EventBusControllerEE } from '@/eventbus/eventBus.controller.ee';
import { LicenseController } from '@/license/license.controller';
import { setupPushServer, setupPushHandler } from '@/push';
import { isLdapEnabled } from './Ldap/helpers';
@ -119,7 +118,6 @@ export class Server extends AbstractServer {
const controllers: Array<Class<object>> = [
EventBusController,
EventBusControllerEE,
AuthController,
LicenseController,
OAuth1CredentialController,

View file

@ -97,15 +97,6 @@ export class MessageEventBusLogWriter {
}
}
/**
* Pauses all logging. Events are still received by the worker, they just are not logged any more
*/
async pauseLogging() {
if (this.worker) {
this.worker.postMessage({ command: 'pauseLogging', data: {} });
}
}
startRecoveryProcess() {
if (this.worker) {
this.worker.postMessage({ command: 'startRecoveryProcess', data: {} });

View file

@ -103,10 +103,6 @@ if (!isMainThread) {
appendMessageSync(data);
parentPort?.postMessage({ command, data: true });
break;
case 'pauseLogging':
loggingPaused = true;
clearInterval(fileStatTimer);
break;
case 'initialize':
const settings: MessageEventBusLogWriterOptions = {
logFullBasePath: (data as MessageEventBusLogWriterOptions).logFullBasePath ?? '',

View file

@ -1,138 +0,0 @@
import express from 'express';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import { RestController, Get, Post, Delete, GlobalScope, Licensed } from '@/decorators';
import { AuthenticatedRequest } from '@/requests';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { MessageEventBus } from './MessageEventBus/MessageEventBus';
import {
isMessageEventBusDestinationSentryOptions,
MessageEventBusDestinationSentry,
} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee';
import {
isMessageEventBusDestinationSyslogOptions,
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
// ----------------------------------------
// TypeGuards
// ----------------------------------------
const isWithIdString = (candidate: unknown): candidate is { id: string } => {
const o = candidate as { id: string };
if (!o) return false;
return o.id !== undefined;
};
const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
const o = candidate as MessageEventBusDestinationWebhookOptions;
if (!o) return false;
return o.url !== undefined;
};
const isMessageEventBusDestinationOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationOptions => {
const o = candidate as MessageEventBusDestinationOptions;
if (!o) return false;
return o.__type !== undefined;
};
// ----------------------------------------
// Controller
// ----------------------------------------
@RestController('/eventbus')
export class EventBusControllerEE {
constructor(private readonly eventBus: MessageEventBus) {}
// ----------------------------------------
// Destinations
// ----------------------------------------
@Licensed('feat:logStreaming')
@Get('/destination')
@GlobalScope('eventBusDestination:list')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
return await this.eventBus.findDestination(req.query.id);
} else {
return await this.eventBus.findDestination();
}
}
@Licensed('feat:logStreaming')
@Post('/destination')
@GlobalScope('eventBusDestination:create')
async postDestination(req: AuthenticatedRequest): Promise<any> {
let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
result = await this.eventBus.addDestination(
new MessageEventBusDestinationSentry(this.eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.webhook:
if (isMessageEventBusDestinationWebhookOptions(req.body)) {
result = await this.eventBus.addDestination(
new MessageEventBusDestinationWebhook(this.eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.syslog:
if (isMessageEventBusDestinationSyslogOptions(req.body)) {
result = await this.eventBus.addDestination(
new MessageEventBusDestinationSyslog(this.eventBus, req.body),
);
}
break;
default:
throw new BadRequestError(
`Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`,
);
}
if (result) {
await result.saveToDb();
return {
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}
@Licensed('feat:logStreaming')
@Get('/testmessage')
@GlobalScope('eventBusDestination:test')
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
return await this.eventBus.testDestination(req.query.id);
}
return false;
}
@Licensed('feat:logStreaming')
@Delete('/destination')
@GlobalScope('eventBusDestination:delete')
async deleteDestination(req: AuthenticatedRequest) {
if (isWithIdString(req.query)) {
return await this.eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}
}

View file

@ -1,112 +1,132 @@
import { eventNamesAll } from './EventMessageClasses';
import express from 'express';
import { EventMessageTypeNames } from 'n8n-workflow';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import { RestController, Get, Post, GlobalScope } from '@/decorators';
import { RestController, Get, Post, Delete, GlobalScope, Licensed } from '@/decorators';
import { AuthenticatedRequest } from '@/requests';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { isEventMessageOptions } from './EventMessageClasses/AbstractEventMessage';
import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric';
import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMessageWorkflow';
import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow';
import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus';
import { MessageEventBus } from './MessageEventBus/MessageEventBus';
import type { EventMessageTypes } from './EventMessageClasses';
import { eventNamesAll } from './EventMessageClasses';
import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit';
import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit';
import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode';
import { EventMessageNode } from './EventMessageClasses/EventMessageNode';
import {
isMessageEventBusDestinationSentryOptions,
MessageEventBusDestinationSentry,
} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee';
import {
isMessageEventBusDestinationSyslogOptions,
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
// ----------------------------------------
// TypeGuards
// ----------------------------------------
const isWithQueryString = (candidate: unknown): candidate is { query: string } => {
const o = candidate as { query: string };
const isWithIdString = (candidate: unknown): candidate is { id: string } => {
const o = candidate as { id: string };
if (!o) return false;
return o.query !== undefined;
return o.id !== undefined;
};
// ----------------------------------------
// Controller
// ----------------------------------------
const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
const o = candidate as MessageEventBusDestinationWebhookOptions;
if (!o) return false;
return o.url !== undefined;
};
const isMessageEventBusDestinationOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationOptions => {
const o = candidate as MessageEventBusDestinationOptions;
if (!o) return false;
return o.__type !== undefined;
};
@RestController('/eventbus')
export class EventBusController {
constructor(private readonly eventBus: MessageEventBus) {}
// ----------------------------------------
// Events
// ----------------------------------------
@Get('/event')
@GlobalScope('eventBusEvent:query')
async getEvents(
req: express.Request,
): Promise<EventMessageTypes[] | Record<string, EventMessageTypes[]>> {
if (isWithQueryString(req.query)) {
switch (req.query.query as EventMessageReturnMode) {
case 'sent':
return await this.eventBus.getEventsSent();
case 'unsent':
return await this.eventBus.getEventsUnsent();
case 'unfinished':
return await this.eventBus.getUnfinishedExecutions();
case 'all':
default:
return await this.eventBus.getEventsAll();
}
} else {
return await this.eventBus.getEventsAll();
}
}
@Get('/execution/:id')
@GlobalScope('eventBusEvent:read')
async getEventForExecutionId(req: express.Request): Promise<EventMessageTypes[] | undefined> {
if (req.params?.id) {
let logHistory;
if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10);
}
return await this.eventBus.getEventsByExecutionId(req.params.id, logHistory);
}
return;
}
@Post('/event')
@GlobalScope('eventBusEvent:create')
async postEvent(req: express.Request): Promise<EventMessageTypes | undefined> {
let msg: EventMessageTypes | undefined;
if (isEventMessageOptions(req.body)) {
switch (req.body.__type) {
case EventMessageTypeNames.workflow:
msg = new EventMessageWorkflow(req.body as EventMessageWorkflowOptions);
break;
case EventMessageTypeNames.audit:
msg = new EventMessageAudit(req.body as EventMessageAuditOptions);
break;
case EventMessageTypeNames.node:
msg = new EventMessageNode(req.body as EventMessageNodeOptions);
break;
case EventMessageTypeNames.generic:
default:
msg = new EventMessageGeneric(req.body);
}
await this.eventBus.send(msg);
} else {
throw new BadRequestError(
'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}',
);
}
return msg;
}
// ----------------------------------------
// Utilities
// ----------------------------------------
@Get('/eventnames')
async getEventNames(): Promise<string[]> {
return eventNamesAll;
}
@Licensed('feat:logStreaming')
@Get('/destination')
@GlobalScope('eventBusDestination:list')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
return await this.eventBus.findDestination(req.query.id);
} else {
return await this.eventBus.findDestination();
}
}
@Licensed('feat:logStreaming')
@Post('/destination')
@GlobalScope('eventBusDestination:create')
async postDestination(req: AuthenticatedRequest): Promise<any> {
let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
result = await this.eventBus.addDestination(
new MessageEventBusDestinationSentry(this.eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.webhook:
if (isMessageEventBusDestinationWebhookOptions(req.body)) {
result = await this.eventBus.addDestination(
new MessageEventBusDestinationWebhook(this.eventBus, req.body),
);
}
break;
case MessageEventBusDestinationTypeNames.syslog:
if (isMessageEventBusDestinationSyslogOptions(req.body)) {
result = await this.eventBus.addDestination(
new MessageEventBusDestinationSyslog(this.eventBus, req.body),
);
}
break;
default:
throw new BadRequestError(
`Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`,
);
}
if (result) {
await result.saveToDb();
return {
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}
@Licensed('feat:logStreaming')
@Get('/testmessage')
@GlobalScope('eventBusDestination:test')
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
return await this.eventBus.testDestination(req.query.id);
}
return false;
}
@Licensed('feat:logStreaming')
@Delete('/destination')
@GlobalScope('eventBusDestination:delete')
async deleteDestination(req: AuthenticatedRequest) {
if (isWithIdString(req.query)) {
return await this.eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}
}

View file

@ -14,12 +14,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [
'communityPackage:uninstall',
'communityPackage:update',
'communityPackage:list',
'eventBusEvent:create',
'eventBusEvent:read',
'eventBusEvent:update',
'eventBusEvent:delete',
'eventBusEvent:query',
'eventBusEvent:create',
'eventBusDestination:create',
'eventBusDestination:read',
'eventBusDestination:update',
@ -81,7 +75,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [
export const GLOBAL_ADMIN_SCOPES = GLOBAL_OWNER_SCOPES.concat();
export const GLOBAL_MEMBER_SCOPES: Scope[] = [
'eventBusEvent:read',
'eventBusDestination:list',
'eventBusDestination:test',
'tag:create',

View file

@ -158,9 +158,7 @@ export const setupTestServer = ({
case 'eventBus':
const { EventBusController } = await import('@/eventbus/eventBus.controller');
const { EventBusControllerEE } = await import('@/eventbus/eventBus.controller.ee');
registerController(app, EventBusController);
registerController(app, EventBusControllerEE);
break;
case 'auth':

View file

@ -47,7 +47,3 @@ export async function getDestinationsFromBackend(
): Promise<MessageEventBusDestinationOptions[]> {
return await makeRestApiRequest(context, 'GET', '/eventbus/destination');
}
export async function getExecutionEvents(context: IRestApiContext, executionId: string) {
return await makeRestApiRequest(context, 'GET', `/eventbus/execution/${executionId}`);
}

View file

@ -24,7 +24,6 @@ export const useRBACStore = defineStore(STORES.RBAC, () => {
orchestration: {},
workersView: {},
eventBusDestination: {},
eventBusEvent: {},
auditLogs: {},
banner: {},
communityPackage: {},

View file

@ -34,7 +34,6 @@ import type {
} from '@/Interface';
import { defineStore } from 'pinia';
import type {
IAbstractEventMessage,
IConnection,
IConnections,
IDataObject,
@ -1432,15 +1431,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
});
}
async function getExecutionEvents(id: string): Promise<IAbstractEventMessage[]> {
const rootStore = useRootStore();
return await makeRestApiRequest(
rootStore.getRestApiContext,
'GET',
`/eventbus/execution/${id}`,
);
}
function getBinaryUrl(
binaryDataId: string,
action: 'view' | 'download',
@ -1651,7 +1641,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
fetchExecutionDataById,
deleteExecution,
addToCurrentExecutions,
getExecutionEvents,
getBinaryUrl,
setNodePristine,
resetChatMessages,