n8n/packages/cli/src/Db.ts
Michael Auerswald b67f803cbe
feat: Add global event bus (#4860)
* fix branch

* fix deserialize, add filewriter

* add catchAll eventGroup/Name

* adding simple Redis sender and receiver to eventbus

* remove native node threads

* improve eventbus

* refactor and simplify

* more refactoring and syslog client

* more refactor, improved endpoints and eventbus

* remove local broker and receivers from mvp

* destination de/serialization

* create MessageEventBusDestinationEntity

* db migrations, load destinations at startup

* add delete destination endpoint

* pnpm merge and circular import fix

* delete destination fix

* trigger log file shuffle after size reached

* add environment variables for eventbus

* reworking event messages

* serialize to thread fix

* some refactor and lint fixing

* add emit to eventbus

* cleanup and fix sending unsent

* quicksave frontend trial

* initial EventTree vue component

* basic log streaming settings in vue

* http request code merge

* create destination settings modals

* fix eventmessage options types

* credentials are loaded

* fix and clean up frontend code

* move request code to axios

* update lock file

* merge fix

* fix redis build

* move destination interfaces into workflow pkg

* revive sentry as destination

* migration fixes and frontend cleanup

* N8N-5777 / N8N-5789 N8N-5788

* N8N-5784

* N8N-5782 removed event levels

* N8N-5790 sentry destination cleanup

* N8N-5786 and refactoring

* N8N-5809 and refactor/cleanup

* UI fixes and anonymize renaming

* N8N-5837

* N8N-5834

* fix no-items UI issues

* remove card / settings label in modal

* N8N-5842 fix

* disable webhook auth for now and update ui

* change sidebar to tabs

* remove payload option

* extend audit events with more user data

* N8N-5853 and UI revert to sidebar

* remove redis destination

* N8N-5864 / N8N-5868 / N8N-5867 / N8N-5865

* ui and licensing fixes

* add node events and info bubbles to frontend

* ui wording changes

* frontend tests

* N8N-5896 and ee rename

* improves backend tests

* merge fix

* fix backend test

* make linter happy

* remove unnecessary cfg / limit  actions to owners

* fix multiple sentry DSN and anon bug

* eslint fix

* more tests and fixes

* merge fix

* fix workflow audit events

* remove 'n8n.workflow.execution.error' event

* merge fix

* lint fix

* lint fix

* review fixes

* fix merge

* prettier fixes

* merge

* review changes

* use loggerproxy

* remove catch from internal hook promises

* fix tests

* lint fix

* include review PR changes

* review changes

* delete duplicate lines from a bad merge

* decouple log-streaming UI options from public API

* logstreaming -> log-streaming for consistency

* do not make unnecessary api calls when log streaming is disabled

* prevent sentryClient.close() from being called if init failed

* fix the e2e test for log-streaming

* review changes

* cleanup

* use `private` for one last private property

* do not use node prefix package names.. just yet

* remove unused import

* fix the tests

because there is a folder called `events`, tsc-alias is messing up all imports for native events module.
https://github.com/justkey007/tsc-alias/issues/152

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
2023-01-04 09:47:48 +01:00

189 lines
5.7 KiB
TypeScript

/* eslint-disable import/no-mutable-exports */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable no-case-declarations */
/* eslint-disable @typescript-eslint/naming-convention */
import {
Connection,
ConnectionOptions,
createConnection,
EntityManager,
EntityTarget,
getRepository,
LoggerOptions,
ObjectLiteral,
Repository,
} from 'typeorm';
import { TlsOptions } from 'tls';
import { DatabaseType, IDatabaseCollections } from '@/Interfaces';
import * as GenericHelpers from '@/GenericHelpers';
import config from '@/config';
import { entities } from '@db/entities';
import {
getMariaDBConnectionOptions,
getMysqlConnectionOptions,
getOptionOverrides,
getPostgresConnectionOptions,
getSqliteConnectionOptions,
} from '@db/config';
export let isInitialized = false;
export const collections = {} as IDatabaseCollections;
export let connection: Connection;
export async function transaction<T>(fn: (entityManager: EntityManager) => Promise<T>): Promise<T> {
return connection.transaction(fn);
}
export function linkRepository<Entity extends ObjectLiteral>(
entityClass: EntityTarget<Entity>,
): Repository<Entity> {
return getRepository(entityClass, connection.name);
}
export async function init(
testConnectionOptions?: ConnectionOptions,
): Promise<IDatabaseCollections> {
if (isInitialized) return collections;
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
let connectionOptions: ConnectionOptions;
const entityPrefix = config.getEnv('database.tablePrefix');
if (testConnectionOptions) {
connectionOptions = testConnectionOptions;
} else {
switch (dbType) {
case 'postgresdb':
const sslCa = (await GenericHelpers.getConfigValue('database.postgresdb.ssl.ca')) as string;
const sslCert = (await GenericHelpers.getConfigValue(
'database.postgresdb.ssl.cert',
)) as string;
const sslKey = (await GenericHelpers.getConfigValue(
'database.postgresdb.ssl.key',
)) as string;
const sslRejectUnauthorized = (await GenericHelpers.getConfigValue(
'database.postgresdb.ssl.rejectUnauthorized',
)) as boolean;
let ssl: TlsOptions | undefined;
if (sslCa !== '' || sslCert !== '' || sslKey !== '' || !sslRejectUnauthorized) {
ssl = {
ca: sslCa || undefined,
cert: sslCert || undefined,
key: sslKey || undefined,
rejectUnauthorized: sslRejectUnauthorized,
};
}
connectionOptions = {
...getPostgresConnectionOptions(),
...(await getOptionOverrides('postgresdb')),
ssl,
};
break;
case 'mariadb':
case 'mysqldb':
connectionOptions = {
...(dbType === 'mysqldb' ? getMysqlConnectionOptions() : getMariaDBConnectionOptions()),
...(await getOptionOverrides('mysqldb')),
timezone: 'Z', // set UTC as default
};
break;
case 'sqlite':
connectionOptions = getSqliteConnectionOptions();
break;
default:
throw new Error(`The database "${dbType}" is currently not supported!`);
}
}
let loggingOption: LoggerOptions = (await GenericHelpers.getConfigValue(
'database.logging.enabled',
)) as boolean;
if (loggingOption) {
const optionsString = (
(await GenericHelpers.getConfigValue('database.logging.options')) as string
).replace(/\s+/g, '');
if (optionsString === 'all') {
loggingOption = optionsString;
} else {
loggingOption = optionsString.split(',') as LoggerOptions;
}
}
const maxQueryExecutionTime = (await GenericHelpers.getConfigValue(
'database.logging.maxQueryExecutionTime',
)) as string;
Object.assign(connectionOptions, {
entities: Object.values(entities),
synchronize: false,
logging: loggingOption,
maxQueryExecutionTime,
});
connection = await createConnection(connectionOptions);
if (!testConnectionOptions && dbType === 'sqlite') {
// This specific migration changes database metadata.
// A field is now nullable. We need to reconnect so that
// n8n knows it has changed. Happens only on sqlite.
let migrations = [];
try {
migrations = await connection.query(
`SELECT id FROM ${entityPrefix}migrations where name = "MakeStoppedAtNullable1607431743769"`,
);
} catch (error) {
// Migration table does not exist yet - it will be created after migrations run for the first time.
}
// If you remove this call, remember to turn back on the
// setting to run migrations automatically above.
await connection.runMigrations({
transaction: 'none',
});
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
if (migrations.length === 0) {
await connection.close();
connection = await createConnection(connectionOptions);
}
}
// @ts-ignore
collections.Credentials = linkRepository(entities.CredentialsEntity);
// @ts-ignore
collections.Execution = linkRepository(entities.ExecutionEntity);
collections.Workflow = linkRepository(entities.WorkflowEntity);
// @ts-ignore
collections.Webhook = linkRepository(entities.WebhookEntity);
collections.Tag = linkRepository(entities.TagEntity);
collections.Role = linkRepository(entities.Role);
collections.User = linkRepository(entities.User);
collections.SharedCredentials = linkRepository(entities.SharedCredentials);
collections.SharedWorkflow = linkRepository(entities.SharedWorkflow);
collections.Settings = linkRepository(entities.Settings);
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
collections.EventDestinations = linkRepository(entities.EventDestinations);
isInitialized = true;
return collections;
}