/* eslint-disable @typescript-eslint/restrict-template-expressions */ import { Container } from 'typedi'; import type { DataSourceOptions as ConnectionOptions, EntityManager, LoggerOptions } from 'typeorm'; import { DataSource as Connection } from 'typeorm'; import type { TlsOptions } from 'tls'; import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; import config from '@/config'; import { entities } from '@db/entities'; import { getMariaDBConnectionOptions, getMysqlConnectionOptions, getOptionOverrides, getPostgresConnectionOptions, getSqliteConnectionOptions, } from '@db/config'; import { inTest } from '@/constants'; import { wrapMigration } from '@db/utils/migrationHelpers'; import type { DatabaseType, Migration } from '@db/types'; let connection: Connection; export const getConnection = () => connection!; type ConnectionState = { connected: boolean; migrated: boolean; }; export const connectionState: ConnectionState = { connected: false, migrated: false, }; // Ping DB connection every 2 seconds let pingTimer: NodeJS.Timer | undefined; if (!inTest) { const pingDBFn = async () => { if (connection?.isInitialized) { try { await connection.query('SELECT 1'); connectionState.connected = true; return; } catch (error) { ErrorReporter.error(error); } finally { pingTimer = setTimeout(pingDBFn, 2000); } } connectionState.connected = false; }; pingTimer = setTimeout(pingDBFn, 2000); } export async function transaction(fn: (entityManager: EntityManager) => Promise): Promise { return await connection.transaction(fn); } export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions { switch (dbType) { case 'postgresdb': const sslCa = config.getEnv('database.postgresdb.ssl.ca'); const sslCert = config.getEnv('database.postgresdb.ssl.cert'); const sslKey = config.getEnv('database.postgresdb.ssl.key'); const sslRejectUnauthorized = config.getEnv('database.postgresdb.ssl.rejectUnauthorized'); let ssl: TlsOptions | boolean = config.getEnv('database.postgresdb.ssl.enabled'); if (sslCa !== '' || sslCert !== '' || sslKey !== '' || !sslRejectUnauthorized) { ssl = { ca: sslCa || undefined, cert: sslCert || undefined, key: sslKey || undefined, rejectUnauthorized: sslRejectUnauthorized, }; } return { ...getPostgresConnectionOptions(), ...getOptionOverrides('postgresdb'), ssl, }; case 'mariadb': case 'mysqldb': return { ...(dbType === 'mysqldb' ? getMysqlConnectionOptions() : getMariaDBConnectionOptions()), ...getOptionOverrides('mysqldb'), timezone: 'Z', // set UTC as default }; case 'sqlite': return getSqliteConnectionOptions(); default: throw new ApplicationError('Database type currently not supported', { extra: { dbType } }); } } export async function setSchema(conn: Connection) { const schema = config.getEnv('database.postgresdb.schema'); const searchPath = ['public']; if (schema !== 'public') { await conn.query(`CREATE SCHEMA IF NOT EXISTS ${schema}`); searchPath.unshift(schema); } await conn.query(`SET search_path TO ${searchPath.join(',')};`); } export async function init(testConnectionOptions?: ConnectionOptions): Promise { if (connectionState.connected) return; const dbType = config.getEnv('database.type'); const connectionOptions = testConnectionOptions ?? getConnectionOptions(dbType); let loggingOption: LoggerOptions = config.getEnv('database.logging.enabled'); if (loggingOption) { const optionsString = config.getEnv('database.logging.options').replace(/\s+/g, ''); if (optionsString === 'all') { loggingOption = optionsString; } else { loggingOption = optionsString.split(',') as LoggerOptions; } } const maxQueryExecutionTime = config.getEnv('database.logging.maxQueryExecutionTime'); Object.assign(connectionOptions, { entities: Object.values(entities), synchronize: false, logging: loggingOption, maxQueryExecutionTime, migrationsRun: false, }); connection = new Connection(connectionOptions); Container.set(Connection, connection); await connection.initialize(); if (dbType === 'postgresdb') { await setSchema(connection); } connectionState.connected = true; } export async function migrate() { (connection.options.migrations as Migration[]).forEach(wrapMigration); await connection.runMigrations({ transaction: 'each' }); connectionState.migrated = true; } export const close = async () => { if (pingTimer) { clearTimeout(pingTimer); pingTimer = undefined; } if (connection.isInitialized) await connection.destroy(); };