feat(core): Convert eventBus controller to decorator style and improve permissions (#5779)

This commit is contained in:
Michael Auerswald 2023-03-27 12:30:03 +02:00 committed by GitHub
parent dd20127961
commit f15f4bdcf2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 150 additions and 96 deletions

View file

@ -129,7 +129,7 @@ import { WaitTracker } from '@/WaitTracker';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { toHttpNodeParameters } from '@/CurlConverterHelper';
import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { EventBusController } from '@/eventbus/eventBus.controller';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { licenseController } from './license/license.controller';
import { Push, setupPushServer, setupPushHandler } from '@/push';
@ -377,6 +377,7 @@ class Server extends AbstractServer {
const samlService = Container.get(SamlService);
const controllers: object[] = [
new EventBusController(),
new AuthController({ config, internalHooks, repositories, logger, postHog }),
new OwnerController({ config, internalHooks, repositories, logger }),
new MeController({ externalHooks, internalHooks, repositories, logger }),
@ -1229,8 +1230,6 @@ class Server extends AbstractServer {
if (!eventBus.isInitialized) {
await eventBus.initialize();
}
// add Event Bus REST endpoints
this.app.use(`/${this.restEndpoint}/eventbus`, eventBusRouter);
// ----------------------------------------
// Webhooks

View file

@ -49,3 +49,11 @@ export type EventMessageTypes =
| EventMessageWorkflow
| EventMessageAudit
| EventMessageNode;
export interface FailedEventSummary {
lastNodeExecuted: string;
executionId: string;
name: string;
event: string;
timestamp: string;
}

View file

@ -1,7 +1,11 @@
import { LoggerProxy } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm';
import type { EventMessageTypes } from '../EventMessageClasses/';
import type {
EventMessageTypes,
EventNamesTypes,
FailedEventSummary,
} from '../EventMessageClasses/';
import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee';
import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter';
import EventEmitter from 'events';
@ -249,6 +253,48 @@ export class MessageEventBus extends EventEmitter {
);
}
async getEventsFailed(amount = 5): Promise<FailedEventSummary[]> {
const result: FailedEventSummary[] = [];
try {
const queryResult = await this.logWriter?.getMessagesAll();
const uniques = uniqby(queryResult, 'id');
const filteredExecutionIds = uniques
.filter((e) =>
(['n8n.workflow.crashed', 'n8n.workflow.failed'] as EventNamesTypes[]).includes(
e.eventName,
),
)
.map((e) => ({
executionId: e.payload.executionId as string,
name: e.payload.workflowName,
timestamp: e.ts,
event: e.eventName,
}))
.filter((e) => e)
.sort((a, b) => (a.timestamp > b.timestamp ? 1 : -1))
.slice(-amount);
for (const execution of filteredExecutionIds) {
const data = await recoverExecutionDataFromEventLogMessages(
execution.executionId,
queryResult,
false,
);
if (data) {
const lastNodeExecuted = data.resultData.lastNodeExecuted;
result.push({
lastNodeExecuted: lastNodeExecuted ?? '',
executionId: execution.executionId,
name: execution.name as string,
event: execution.event,
timestamp: execution.timestamp.toISO(),
});
}
}
} catch {}
return result;
}
async getEventsAll(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesAll();
const filtered = uniqby(queryResult, 'id');

View file

@ -17,22 +17,26 @@ import {
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses';
import { eventNamesAll } from './EventMessageClasses';
import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit';
import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit';
import { BadRequestError } from '../ResponseHelper';
import { BadRequestError } from '@/ResponseHelper';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
IRunExecutionData,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames, EventMessageTypeNames } from 'n8n-workflow';
import type { User } from '../databases/entities/User';
import type { User } from '@db/entities/User';
import * as ResponseHelper from '@/ResponseHelper';
import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode';
import { EventMessageNode } from './EventMessageClasses/EventMessageNode';
import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents';
export const eventBusRouter = express.Router();
import { RestController, Get, Post, Delete } from '@/decorators';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
import { isOwnerMiddleware } from '../middlewares/isOwner';
import type { DeleteResult } from 'typeorm';
// ----------------------------------------
// TypeGuards
@ -50,7 +54,6 @@ const isWithQueryString = (candidate: unknown): candidate is { query: string } =
return o.query !== undefined;
};
// TODO: add credentials
const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
@ -67,12 +70,19 @@ const isMessageEventBusDestinationOptions = (
return o.__type !== undefined;
};
// ----------------------------------------
// Controller
// ----------------------------------------
@RestController('/eventbus')
export class EventBusController {
// ----------------------------------------
// Events
// ----------------------------------------
eventBusRouter.get(
'/event',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/event', { middlewares: [isOwnerMiddleware] })
async getEvents(
req: express.Request,
): Promise<EventMessageTypes[] | Record<string, EventMessageTypes[]>> {
if (isWithQueryString(req.query)) {
switch (req.query.query as EventMessageReturnMode) {
case 'sent':
@ -85,14 +95,19 @@ eventBusRouter.get(
default:
return eventBus.getEventsAll();
}
}
} else {
return eventBus.getEventsAll();
}),
);
}
}
eventBusRouter.get(
'/execution/:id',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/failed')
async getFailedEvents(req: express.Request): Promise<FailedEventSummary[]> {
const amount = parseInt(req.query?.amount as string) ?? 5;
return eventBus.getEventsFailed(amount);
}
@Get('/execution/:id')
async getEventForExecutionId(req: express.Request): Promise<EventMessageTypes[] | undefined> {
if (req.params?.id) {
let logHistory;
if (req.query?.logHistory) {
@ -100,40 +115,27 @@ eventBusRouter.get(
}
return eventBus.getEventsByExecutionId(req.params.id, logHistory);
}
}),
);
return;
}
eventBusRouter.get(
'/execution-recover/:id',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/execution-recover/:id')
async getRecoveryForExecutionId(req: express.Request): Promise<IRunExecutionData | undefined> {
const { id } = req.params;
if (req.params?.id) {
let logHistory;
let applyToDb = true;
if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10);
}
if (req.query?.applyToDb) {
applyToDb = !!req.query.applyToDb;
}
const messages = await eventBus.getEventsByExecutionId(req.params.id, logHistory);
const logHistory = parseInt(req.query.logHistory as string, 10) || undefined;
const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true;
const messages = await eventBus.getEventsByExecutionId(id, logHistory);
if (messages.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const recoverResult = await recoverExecutionDataFromEventLogMessages(
req.params.id,
messages,
applyToDb,
);
return recoverResult;
return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb);
}
}
}),
);
return;
}
eventBusRouter.post(
'/event',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Post('/event', { middlewares: [isOwnerMiddleware] })
async postEvent(req: express.Request): Promise<EventMessageTypes | undefined> {
let msg: EventMessageTypes | undefined;
if (isEventMessageOptions(req.body)) {
let msg;
switch (req.body.__type) {
case EventMessageTypeNames.workflow:
msg = new EventMessageWorkflow(req.body as EventMessageWorkflowOptions);
@ -154,35 +156,30 @@ eventBusRouter.post(
'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}',
);
}
}),
);
return msg;
}
// ----------------------------------------
// Destinations
// ----------------------------------------
eventBusRouter.get(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
let result = [];
@Get('/destination')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
result = await eventBus.findDestination(req.query.id);
return eventBus.findDestination(req.query.id);
} else {
result = await eventBus.findDestination();
return eventBus.findDestination();
}
}
return result;
}),
);
eventBusRouter.post(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Post('/destination', { middlewares: [isOwnerMiddleware] })
async postDestination(req: express.Request): Promise<any> {
if (!req.user || (req.user as User).globalRole.name !== 'owner') {
throw new ResponseHelper.UnauthorizedError('Invalid request');
}
let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
let result;
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
@ -214,51 +211,41 @@ eventBusRouter.post(
if (result) {
await result.saveToDb();
return {
...result,
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}),
);
eventBusRouter.get(
'/testmessage',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
let result = false;
if (isWithIdString(req.query)) {
result = await eventBus.testDestination(req.query.id);
}
return result;
}),
);
eventBusRouter.delete(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/testmessage')
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
return eventBus.testDestination(req.query.id);
}
return false;
}
@Delete('/destination', { middlewares: [isOwnerMiddleware] })
async deleteDestination(req: express.Request): Promise<DeleteResult | undefined> {
if (!req.user || (req.user as User).globalRole.name !== 'owner') {
throw new ResponseHelper.UnauthorizedError('Invalid request');
}
if (isWithIdString(req.query)) {
const result = await eventBus.removeDestination(req.query.id);
if (result) {
return result;
}
return eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}),
);
}
// ----------------------------------------
// Utilities
// ----------------------------------------
eventBusRouter.get(
'/eventnames',
ResponseHelper.send(async (): Promise<any> => {
@Get('/eventnames')
async getEventNames(): Promise<string[]> {
return eventNamesAll;
}),
);
}
}

View file

@ -0,0 +1,12 @@
import type { RequestHandler } from 'express';
import { LoggerProxy } from 'n8n-workflow';
import type { AuthenticatedRequest } from '@/requests';
export const isOwnerMiddleware: RequestHandler = (req: AuthenticatedRequest, res, next) => {
if (req.user.globalRole.name === 'owner') {
next();
} else {
LoggerProxy.debug('Request failed because user is not owner');
res.status(401).send('Unauthorized');
}
};

View file

@ -100,7 +100,7 @@ beforeAll(async () => {
utils.initConfigFile();
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
config.set('eventBus.logWriter.keepLogCount', '1');
config.set('eventBus.logWriter.keepLogCount', 1);
config.set('enterprise.features.logStreaming', true);
config.set('userManagement.disabled', false);
config.set('userManagement.isInstanceOwnerSetUp', true);

View file

@ -56,7 +56,6 @@ import type {
PostgresSchemaSection,
} from './types';
import { licenseController } from '@/license/license.controller';
import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { registerController } from '@/decorators';
import {
AuthController,
@ -81,6 +80,7 @@ import { Push } from '@/push';
import { setSamlLoginEnabled } from '@/sso/saml/samlHelpers';
import { SamlService } from '@/sso/saml/saml.service.ee';
import { SamlController } from '@/sso/saml/routes/saml.controller.ee';
import { EventBusController } from '@/eventbus/eventBus.controller';
export const mockInstance = <T>(
ctor: new (...args: any[]) => T,
@ -151,7 +151,6 @@ export async function initTestServer({
credentials: { controller: credentialsController, path: 'credentials' },
workflows: { controller: workflowsController, path: 'workflows' },
license: { controller: licenseController, path: 'license' },
eventBus: { controller: eventBusRouter, path: 'eventbus' },
};
if (enablePublicAPI) {
@ -176,6 +175,9 @@ export async function initTestServer({
for (const group of functionEndpoints) {
switch (group) {
case 'eventBus':
registerController(testServer.app, config, new EventBusController());
break;
case 'auth':
registerController(
testServer.app,
@ -266,7 +268,7 @@ const classifyEndpointGroups = (endpointGroups: EndpointGroup[]) => {
const routerEndpoints: EndpointGroup[] = [];
const functionEndpoints: EndpointGroup[] = [];
const ROUTER_GROUP = ['credentials', 'workflows', 'publicApi', 'eventBus', 'license'];
const ROUTER_GROUP = ['credentials', 'workflows', 'publicApi', 'license'];
endpointGroups.forEach((group) =>
(ROUTER_GROUP.includes(group) ? routerEndpoints : functionEndpoints).push(group),