refactor(core): Use an IoC container to manage singleton classes [Part-2] (no-changelog) (#5690)

* use typedi for UserManagementMailer

* use typedi for SamlService

* fix typos

* use typedi for Queue

* use typedi for License

* convert some more code to use typedi
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-03-16 15:34:13 +01:00 committed by GitHub
parent c07f838ce6
commit 9bd7529193
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 154 additions and 178 deletions

View file

@ -302,7 +302,7 @@ export abstract class AbstractServer {
// ----------------------------------------
protected setupWaitingWebhookEndpoint() {
const endpoint = this.endpointWebhookWaiting;
const waitingWebhooks = new WaitingWebhooks();
const waitingWebhooks = Container.get(WaitingWebhooks);
// Register all webhook-waiting requests
this.app.all(`/${endpoint}/*`, async (req, res) => {

View file

@ -1,7 +1,4 @@
/* eslint-disable prefer-template */
/* eslint-disable @typescript-eslint/restrict-plus-operands */
/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */
/* eslint-disable no-param-reassign */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
@ -25,7 +22,7 @@ import type {
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { isWorkflowIdValid } from '@/utils';
import { Service } from 'typedi';
@Service()
@ -60,7 +57,7 @@ export class ActiveExecutions {
}
const workflowId = executionData.workflowData.id;
if (workflowId !== undefined && WorkflowHelpers.isWorkflowIdValid(workflowId)) {
if (workflowId !== undefined && isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}

View file

@ -9,7 +9,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container, Service } from 'typedi';
import { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import type {
@ -82,7 +82,11 @@ export class ActiveWorkflowRunner {
[key: string]: IQueuedWorkflowActivations;
} = {};
constructor(private externalHooks: ExternalHooks) {}
constructor(
private activeExecutions: ActiveExecutions,
private externalHooks: ExternalHooks,
private nodeTypes: NodeTypes,
) {}
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async init() {
@ -271,14 +275,13 @@ export class ActiveWorkflowRunner {
);
}
const nodeTypes = Container.get(NodeTypes);
const workflow = new Workflow({
id: webhook.workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
@ -514,14 +517,13 @@ export class ActiveWorkflowRunner {
throw new Error(`Could not find workflow with id "${workflowId}"`);
}
const nodeTypes = Container.get(NodeTypes);
const workflow = new Workflow({
id: workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
@ -638,7 +640,7 @@ export class ActiveWorkflowRunner {
if (donePromise) {
executePromise.then((executionId) => {
Container.get(ActiveExecutions)
this.activeExecutions
.getPostExecutePromise(executionId)
.then(donePromise.resolve)
.catch(donePromise.reject);
@ -695,7 +697,7 @@ export class ActiveWorkflowRunner {
if (donePromise) {
executePromise.then((executionId) => {
Container.get(ActiveExecutions)
this.activeExecutions
.getPostExecutePromise(executionId)
.then(donePromise.resolve)
.catch(donePromise.reject);
@ -782,14 +784,13 @@ export class ActiveWorkflowRunner {
if (!workflowData) {
throw new Error(`Could not find workflow with id "${workflowId}".`);
}
const nodeTypes = Container.get(NodeTypes);
workflowInstance = new Workflow({
id: workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});

View file

@ -2,6 +2,7 @@
import { AES, enc } from 'crypto-js';
import type { Entry as LdapUser } from 'ldapts';
import { Filter } from 'ldapts/filters/Filter';
import { Container } from 'typedi';
import { UserSettings } from 'n8n-core';
import { validate } from 'jsonschema';
import * as Db from '@/Db';
@ -23,15 +24,14 @@ import {
} from './constants';
import type { ConnectionSecurity, LdapConfig } from './types';
import { jsonParse, LoggerProxy as Logger } from 'n8n-workflow';
import { getLicense } from '@/License';
import { Container } from 'typedi';
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
/**
* Check whether the LDAP feature is disabled in the instance
*/
export const isLdapEnabled = (): boolean => {
const license = getLicense();
const license = Container.get(License);
return isUserManagementEnabled() && (config.getEnv(LDAP_ENABLED) || license.isLdapEnabled());
};

View file

@ -5,6 +5,7 @@ import { getLogger } from './Logger';
import config from '@/config';
import * as Db from '@/Db';
import { LICENSE_FEATURES, N8N_VERSION, SETTINGS_LICENSE_CERT_KEY } from './constants';
import { Service } from 'typedi';
async function loadCertStr(): Promise<TLicenseContainerStr> {
const databaseSettings = await Db.collections.Settings.findOne({
@ -27,6 +28,7 @@ async function saveCertStr(value: TLicenseContainerStr): Promise<void> {
);
}
@Service()
export class License {
private logger: ILogger;
@ -160,13 +162,3 @@ export class License {
return (this.getFeatureValue('planName') ?? 'Community') as string;
}
}
let licenseInstance: License | undefined;
export function getLicense(): License {
if (licenseInstance === undefined) {
licenseInstance = new License();
}
return licenseInstance;
}

View file

@ -7,7 +7,7 @@ import type { Role } from '@db/entities/Role';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type * as UserManagementMailer from '@/UserManagement/email/UserManagementMailer';
import type { UserManagementMailer } from '@/UserManagement/email';
import type { Risk } from '@/audit/types';
@ -26,10 +26,10 @@ export type AuthenticatedRequest<
> = express.Request<RouteParams, ResponseBody, RequestBody, RequestQuery> & {
user: User;
globalMemberRole?: Role;
mailer?: UserManagementMailer.UserManagementMailer;
mailer?: UserManagementMailer;
};
export type PaginatatedRequest = AuthenticatedRequest<
export type PaginatedRequest = AuthenticatedRequest<
{},
{},
{},

View file

@ -2,7 +2,7 @@
import type express from 'express';
import type { AuthenticatedRequest, PaginatatedRequest } from '../../../types';
import type { AuthenticatedRequest, PaginatedRequest } from '../../../types';
import { decodeCursor } from '../services/pagination.service';
export const authorize =
@ -22,7 +22,7 @@ export const authorize =
};
export const validCursor = (
req: PaginatatedRequest,
req: PaginatedRequest,
res: express.Response,
next: express.NextFunction,
): express.Response | void => {

View file

@ -1,10 +1,10 @@
import type Bull from 'bull';
import type { RedisOptions } from 'ioredis';
import { Service } from 'typedi';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers';
import { Container } from 'typedi';
export type JobId = Bull.JobId;
export type Job = Bull.Job<JobData>;
@ -24,6 +24,7 @@ export interface WebhookResponse {
response: IExecuteResponsePromiseData;
}
@Service()
export class Queue {
private jobQueue: JobQueue;
@ -91,14 +92,3 @@ export class Queue {
return false;
}
}
let activeQueueInstance: Queue | undefined;
export async function getInstance(): Promise<Queue> {
if (activeQueueInstance === undefined) {
activeQueueInstance = new Queue(Container.get(ActiveExecutions));
await activeQueueInstance.init();
}
return activeQueueInstance;
}

View file

@ -56,7 +56,7 @@ import timezones from 'google-timezones-json';
import history from 'connect-history-api-fallback';
import config from '@/config';
import * as Queue from '@/Queue';
import { Queue } from '@/Queue';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { workflowsController } from '@/workflows/workflows.controller';
@ -103,7 +103,7 @@ import {
isUserManagementEnabled,
whereClause,
} from '@/UserManagement/UserManagementHelper';
import { getInstance as getMailerInstance } from '@/UserManagement/email';
import { UserManagementMailer } from '@/UserManagement/email';
import * as Db from '@/Db';
import type {
ICredentialsDb,
@ -367,16 +367,23 @@ class Server extends AbstractServer {
const logger = LoggerProxy;
const internalHooks = Container.get(InternalHooks);
const mailer = getMailerInstance();
const mailer = Container.get(UserManagementMailer);
const postHog = this.postHog;
const samlService = SamlService.getInstance();
const samlService = Container.get(SamlService);
const controllers: object[] = [
new AuthController({ config, internalHooks, repositories, logger, postHog }),
new OwnerController({ config, internalHooks, repositories, logger }),
new MeController({ externalHooks, internalHooks, repositories, logger }),
new NodeTypesController({ config, nodeTypes }),
new PasswordResetController({ config, externalHooks, internalHooks, repositories, logger }),
new PasswordResetController({
config,
externalHooks,
internalHooks,
mailer,
repositories,
logger,
}),
new TagsController({ config, repositories, externalHooks }),
new TranslationController(config, this.credentialTypes),
new UsersController({
@ -480,6 +487,10 @@ class Server extends AbstractServer {
}),
);
if (config.getEnv('executions.mode') === 'queue') {
await Container.get(Queue).init();
}
await handleLdapInit();
this.registerControllers(ignoredEndpoints);
@ -509,7 +520,7 @@ class Server extends AbstractServer {
// set up the initial environment
if (isSamlLicensed()) {
try {
await SamlService.getInstance().init();
await Container.get(SamlService).init();
} catch (error) {
LoggerProxy.error(`SAML initialization failed: ${error.message}`);
}
@ -953,7 +964,7 @@ class Server extends AbstractServer {
ResponseHelper.send(
async (req: ExecutionRequest.GetAllCurrent): Promise<IExecutionsSummary[]> => {
if (config.getEnv('executions.mode') === 'queue') {
const queue = await Queue.getInstance();
const queue = Container.get(Queue);
const currentJobs = await queue.getJobs(['active', 'waiting']);
const currentlyRunningQueueIds = currentJobs.map((job) => job.data.executionId);
@ -1096,7 +1107,7 @@ class Server extends AbstractServer {
} as IExecutionsStopData;
}
const queue = await Queue.getInstance();
const queue = Container.get(Queue);
const currentJobs = await queue.getJobs(['active', 'waiting']);
const job = currentJobs.find((job) => job.data.executionId === req.params.id);

View file

@ -3,6 +3,7 @@
import { In } from 'typeorm';
import type express from 'express';
import { compare, genSaltSync, hash } from 'bcryptjs';
import Container from 'typedi';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
@ -13,7 +14,7 @@ import type { Role } from '@db/entities/Role';
import type { AuthenticatedRequest } from '@/requests';
import config from '@/config';
import { getWebhookBaseUrl } from '@/WebhookHelpers';
import { getLicense } from '@/License';
import { License } from '@/License';
import { RoleService } from '@/role/role.service';
import type { PostHogClient } from '@/posthog';
@ -55,7 +56,7 @@ export function isUserManagementEnabled(): boolean {
}
export function isSharingEnabled(): boolean {
const license = getLicense();
const license = Container.get(License);
return (
isUserManagementEnabled() &&
(config.getEnv('enterprise.features.sharing') || license.isSharingEnabled())

View file

@ -1,9 +1,3 @@
export interface UserManagementMailerImplementation {
init: () => Promise<void>;
sendMail: (mailData: MailData) => Promise<SendEmailResult>;
verifyConnection: () => Promise<void>;
}
export type InviteEmailData = {
email: string;
firstName?: string;

View file

@ -3,9 +3,9 @@ import type { Transporter } from 'nodemailer';
import { createTransport } from 'nodemailer';
import { ErrorReporterProxy as ErrorReporter, LoggerProxy as Logger } from 'n8n-workflow';
import config from '@/config';
import type { MailData, SendEmailResult, UserManagementMailerImplementation } from './Interfaces';
import type { MailData, SendEmailResult } from './Interfaces';
export class NodeMailer implements UserManagementMailerImplementation {
export class NodeMailer {
private transport?: Transporter;
async init(): Promise<void> {

View file

@ -2,13 +2,9 @@ import { existsSync } from 'fs';
import { readFile } from 'fs/promises';
import Handlebars from 'handlebars';
import { join as pathJoin } from 'path';
import { Service } from 'typedi';
import config from '@/config';
import type {
InviteEmailData,
PasswordResetData,
SendEmailResult,
UserManagementMailerImplementation,
} from './Interfaces';
import type { InviteEmailData, PasswordResetData, SendEmailResult } from './Interfaces';
import { NodeMailer } from './NodeMailer';
type Template = HandlebarsTemplateDelegate<unknown>;
@ -36,8 +32,9 @@ async function getTemplate(
return template;
}
@Service()
export class UserManagementMailer {
private mailer: UserManagementMailerImplementation | undefined;
private mailer: NodeMailer | undefined;
constructor() {
// Other implementations can be used in the future.
@ -81,12 +78,3 @@ export class UserManagementMailer {
return result ?? { emailSent: false };
}
}
let mailerInstance: UserManagementMailer | undefined;
export function getInstance(): UserManagementMailer {
if (mailerInstance === undefined) {
mailerInstance = new UserManagementMailer();
}
return mailerInstance;
}

View file

@ -1,3 +1,3 @@
import { getInstance, UserManagementMailer } from './UserManagementMailer';
import { UserManagementMailer } from './UserManagementMailer';
export { getInstance, UserManagementMailer };
export { UserManagementMailer };

View file

@ -10,6 +10,7 @@ import {
LoggerProxy as Logger,
WorkflowOperationError,
} from 'n8n-workflow';
import { Service } from 'typedi';
import type { FindManyOptions, ObjectLiteral } from 'typeorm';
import { Not, LessThanOrEqual } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
@ -17,7 +18,6 @@ import { DateUtils } from 'typeorm/util/DateUtils';
import config from '@/config';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
import { ActiveExecutions } from '@/ActiveExecutions';
import type {
IExecutionFlattedDb,
IExecutionsStopData,
@ -25,12 +25,9 @@ import type {
} from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container, Service } from 'typedi';
@Service()
export class WaitTracker {
activeExecutionsInstance: ActiveExecutions;
private waitingExecutions: {
[key: string]: {
executionId: string;
@ -41,8 +38,6 @@ export class WaitTracker {
mainTimer: NodeJS.Timeout;
constructor() {
this.activeExecutionsInstance = Container.get(ActiveExecutions);
// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
this.getWaitingExecutions();

View file

@ -3,7 +3,7 @@
/* eslint-disable no-param-reassign */
import type { INode, WebhookHttpMethod } from 'n8n-workflow';
import { NodeHelpers, Workflow, LoggerProxy as Logger } from 'n8n-workflow';
import { Service } from 'typedi';
import type express from 'express';
import * as Db from '@/Db';
@ -13,9 +13,11 @@ import { NodeTypes } from '@/NodeTypes';
import type { IExecutionResponse, IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';
@Service()
export class WaitingWebhooks {
constructor(private nodeTypes: NodeTypes) {}
async executeWebhook(
httpMethod: WebhookHttpMethod,
fullPath: string,
@ -79,14 +81,13 @@ export class WaitingWebhooks {
const { workflowData } = fullExecutionData;
const nodeTypes = Container.get(NodeTypes);
const workflow = new Workflow({
id: workflowData.id!.toString(),
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});

View file

@ -66,7 +66,7 @@ import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { findSubworkflowStart } from '@/utils';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services';
import { Container } from 'typedi';
@ -530,11 +530,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
const isManualMode = [this.mode, parentProcessMode].includes('manual');
try {
if (
!isManualMode &&
WorkflowHelpers.isWorkflowIdValid(this.workflowData.id) &&
newStaticData
) {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(
@ -641,7 +637,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}
const workflowId = this.workflowData.id;
if (WorkflowHelpers.isWorkflowIdValid(workflowId)) {
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}
@ -729,7 +725,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
newStaticData: IDataObject,
): Promise<void> {
try {
if (WorkflowHelpers.isWorkflowIdValid(this.workflowData.id) && newStaticData) {
if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(
@ -776,7 +772,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
}
const workflowId = this.workflowData.id;
if (WorkflowHelpers.isWorkflowIdValid(workflowId)) {
if (isWorkflowIdValid(workflowId)) {
fullExecutionData.workflowId = workflowId;
}

View file

@ -1,4 +1,5 @@
import { In } from 'typeorm';
import { Container } from 'typedi';
import type {
IDataObject,
IExecuteData,
@ -32,7 +33,7 @@ import type { User } from '@db/entities/User';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import omit from 'lodash.omit';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { Container } from 'typedi';
import { isWorkflowIdValid } from './utils';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@ -74,15 +75,6 @@ export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefi
return lastNodeRunData;
}
/**
* Returns if the given id is a valid workflow id
*
* @param {(string | null | undefined)} id The id to check
*/
export function isWorkflowIdValid(id: string | null | undefined): boolean {
return !(typeof id === 'string' && isNaN(parseInt(id, 10)));
}
/**
* Executes the error workflow
*

View file

@ -44,7 +44,8 @@ import type {
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import * as Queue from '@/Queue';
import type { Job, JobData, JobQueue, JobResponse } from '@/Queue';
import { Queue } from '@/Queue';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
@ -63,7 +64,7 @@ export class WorkflowRunner {
push: Push;
jobQueue: Queue.JobQueue;
jobQueue: JobQueue;
constructor() {
this.push = Container.get(Push);
@ -167,7 +168,7 @@ export class WorkflowRunner {
await initErrorHandling();
if (executionsMode === 'queue') {
const queue = await Queue.getInstance();
const queue = Container.get(Queue);
this.jobQueue = queue.getBullObjectInstance();
}
@ -434,7 +435,7 @@ export class WorkflowRunner {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
const jobData: Queue.JobData = {
const jobData: JobData = {
executionId,
loadStaticData: !!loadStaticData,
};
@ -451,7 +452,7 @@ export class WorkflowRunner {
removeOnComplete: true,
removeOnFail: true,
};
let job: Queue.Job;
let job: Job;
let hooks: WorkflowHooks;
try {
job = await this.jobQueue.add(jobData, jobOptions);
@ -485,7 +486,7 @@ export class WorkflowRunner {
async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
const queue = await Queue.getInstance();
const queue = Container.get(Queue);
await queue.stopJob(job);
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
@ -503,11 +504,11 @@ export class WorkflowRunner {
reject(error);
});
const jobData: Promise<Queue.JobResponse> = job.finished();
const jobData: Promise<JobResponse> = job.finished();
const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval');
const racingPromises: Array<Promise<Queue.JobResponse | object>> = [jobData];
const racingPromises: Array<Promise<JobResponse | object>> = [jobData];
let clearWatchdogInterval;
if (queueRecoveryInterval > 0) {

View file

@ -54,9 +54,9 @@ import config from '@/config';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { getLicense } from './License';
import { InternalHooks } from './InternalHooks';
import { PostHogClient } from './posthog';
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog';
class WorkflowRunnerProcess {
data: IWorkflowExecutionDataProcessWithExecution | undefined;
@ -127,7 +127,7 @@ class WorkflowRunnerProcess {
// Init db since we need to read the license.
await Db.init();
const license = getLicense();
const license = Container.get(License);
await license.init(instanceId);
// Start timeout for the execution

View file

@ -6,11 +6,10 @@ import { ExecutionBaseError } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as Db from '@/Db';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils';
import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
@ -101,7 +100,7 @@ export class Execute extends BaseCommand {
throw new Error('Failed to retrieve workflow data for requested workflow');
}
if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) {
if (!isWorkflowIdValid(workflowId)) {
workflowId = undefined;
}

View file

@ -28,7 +28,7 @@ import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
import { getLicense } from '@/License';
import { License } from '@/License';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
@ -183,7 +183,7 @@ export class Start extends BaseCommand {
}
async initLicense(): Promise<void> {
const license = getLicense();
const license = Container.get(License);
await license.init(this.instanceId);
const activationKey = config.getEnv('license.activationKey');

View file

@ -1,6 +1,7 @@
import express from 'express';
import http from 'http';
import type PCancelable from 'p-cancelable';
import { Container } from 'typedi';
import { flags } from '@oclif/command';
import { WorkflowExecute } from 'n8n-core';
@ -15,7 +16,8 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import config from '@/config';
import * as Queue from '@/Queue';
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue';
import { Queue } from '@/Queue';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants';
@ -38,7 +40,7 @@ export class Worker extends BaseCommand {
[key: string]: PCancelable<IRun>;
} = {};
static jobQueue: Queue.JobQueue;
static jobQueue: JobQueue;
/**
* Stop n8n in a graceful way.
@ -86,7 +88,7 @@ export class Worker extends BaseCommand {
await this.exitSuccessFully();
}
async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> {
async runJob(job: Job, nodeTypes: INodeTypes): Promise<JobResponse> {
const { executionId, loadStaticData } = job.data;
const executionDb = await Db.collections.Execution.findOneBy({ id: executionId });
@ -179,7 +181,7 @@ export class Worker extends BaseCommand {
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
const progress: Queue.WebhookResponse = {
const progress: WebhookResponse = {
executionId,
response: WebhookHelpers.encodeWebhookResponse(response),
};
@ -238,7 +240,8 @@ export class Worker extends BaseCommand {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
const queue = await Queue.getInstance();
const queue = Container.get(Queue);
await queue.init();
Worker.jobQueue = queue.getBullObjectInstance();
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, this.nodeTypes));
@ -248,7 +251,7 @@ export class Worker extends BaseCommand {
this.logger.info(` * Concurrency: ${flags.concurrency}`);
this.logger.info('');
Worker.jobQueue.on('global:progress', (jobId: Queue.JobId, progress) => {
Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => {
// Progress of a job got updated which does get used
// to communicate that a job got canceled.

View file

@ -14,7 +14,7 @@ import {
hashPassword,
validatePassword,
} from '@/UserManagement/UserManagementHelper';
import * as UserManagementMailer from '@/UserManagement/email';
import type { UserManagementMailer } from '@/UserManagement/email';
import { Response } from 'express';
import type { ILogger } from 'n8n-workflow';
@ -35,6 +35,8 @@ export class PasswordResetController {
private readonly internalHooks: IInternalHooksClass;
private readonly mailer: UserManagementMailer;
private readonly userRepository: Repository<User>;
constructor({
@ -42,18 +44,21 @@ export class PasswordResetController {
logger,
externalHooks,
internalHooks,
mailer,
repositories,
}: {
config: Config;
logger: ILogger;
externalHooks: IExternalHooksClass;
internalHooks: IInternalHooksClass;
mailer: UserManagementMailer;
repositories: Pick<IDatabaseCollections, 'User'>;
}) {
this.config = config;
this.logger = logger;
this.externalHooks = externalHooks;
this.internalHooks = internalHooks;
this.mailer = mailer;
this.userRepository = repositories.User;
}
@ -126,8 +131,7 @@ export class PasswordResetController {
url.searchParams.append('token', resetPasswordToken);
try {
const mailer = UserManagementMailer.getInstance();
await mailer.passwordReset({
await this.mailer.passwordReset({
email,
firstName,
lastName,

View file

@ -1,7 +1,8 @@
import config from '@/config';
import { getLicense } from '@/License';
import { License } from '@/License';
import { Container } from 'typedi';
export function isLogStreamingEnabled(): boolean {
const license = getLicense();
const license = Container.get(License);
return config.getEnv('enterprise.features.logStreaming') || license.isLogStreamingEnabled();
}

View file

@ -1,6 +1,7 @@
import { Container } from 'typedi';
import type { IExecutionFlattedDb } from '@/Interfaces';
import type { ExecutionStatus } from 'n8n-workflow';
import { getLicense } from '@/License';
import { License } from '@/License';
import config from '@/config';
export function getStatusUsingPreviousExecutionStatusMethod(
@ -20,7 +21,7 @@ export function getStatusUsingPreviousExecutionStatusMethod(
}
export function isAdvancedExecutionFiltersEnabled(): boolean {
const license = getLicense();
const license = Container.get(License);
return (
config.getEnv('enterprise.features.advancedExecutionFilters') ||
license.isAdvancedExecutionFiltersEnabled()

View file

@ -26,7 +26,7 @@ import type {
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import * as Queue from '@/Queue';
import { Queue } from '@/Queue';
import type { ExecutionRequest } from '@/requests';
import * as ResponseHelper from '@/ResponseHelper';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
@ -197,7 +197,7 @@ export class ExecutionsService {
const executingWorkflowIds: string[] = [];
if (config.getEnv('executions.mode') === 'queue') {
const queue = await Queue.getInstance();
const queue = Container.get(Queue);
const currentJobs = await queue.getJobs(['active', 'waiting']);
executingWorkflowIds.push(...currentJobs.map(({ data }) => data.executionId));
}

View file

@ -7,7 +7,6 @@ export * from './WaitingWebhooks';
export * from './WorkflowCredentials';
export * from './WorkflowRunner';
import { ActiveExecutions } from './ActiveExecutions';
import * as Db from './Db';
import * as GenericHelpers from './GenericHelpers';
import * as ResponseHelper from './ResponseHelper';
@ -19,7 +18,6 @@ import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData'
import * as WorkflowHelpers from './WorkflowHelpers';
export {
ActiveExecutions,
Db,
GenericHelpers,
ResponseHelper,

View file

@ -1,4 +1,5 @@
import { getLicense } from '@/License';
import { Container } from 'typedi';
import { License } from '@/License';
import type { ILicenseReadResponse } from '@/Interfaces';
import * as Db from '@/Db';
@ -11,7 +12,7 @@ export class LicenseService {
// Helper for getting the basic license data that we want to return
static async getLicenseData(): Promise<ILicenseReadResponse> {
const triggerCount = await LicenseService.getActiveTriggerCount();
const license = getLicense();
const license = Container.get(License);
const mainPlan = license.getMainPlan();
return {

View file

@ -7,7 +7,7 @@ import { getLogger } from '@/Logger';
import * as ResponseHelper from '@/ResponseHelper';
import type { ILicensePostResponse, ILicenseReadResponse } from '@/Interfaces';
import { LicenseService } from './License.service';
import { getLicense } from '@/License';
import { License } from '@/License';
import type { AuthenticatedRequest, LicenseRequest } from '@/requests';
import { isInstanceOwner } from '@/PublicApi/v1/handlers/users/users.service';
import { Container } from 'typedi';
@ -69,7 +69,7 @@ licenseController.post(
'/activate',
ResponseHelper.send(async (req: LicenseRequest.Activate): Promise<ILicensePostResponse> => {
// Call the license manager activate function and tell it to throw an error
const license = getLicense();
const license = Container.get(License);
try {
await license.activate(req.body.activationKey);
} catch (e) {
@ -111,7 +111,7 @@ licenseController.post(
'/renew',
ResponseHelper.send(async (): Promise<ILicensePostResponse> => {
// Call the license manager activate function and tell it to throw an error
const license = getLicense();
const license = Container.get(License);
try {
await license.renew();
} catch (e) {

View file

@ -15,7 +15,7 @@ import { NoXss } from '@db/utils/customValidators';
import type { PublicUser, IExecutionDeleteFilter, IWorkflowDb } from '@/Interfaces';
import type { Role } from '@db/entities/Role';
import type { User } from '@db/entities/User';
import type * as UserManagementMailer from '@/UserManagement/email/UserManagementMailer';
import type { UserManagementMailer } from '@/UserManagement/email';
export class UserUpdatePayload implements Pick<User, 'email' | 'firstName' | 'lastName'> {
@IsEmail()
@ -46,7 +46,7 @@ export type AuthenticatedRequest<
RequestQuery = {},
> = Omit<express.Request<RouteParams, ResponseBody, RequestBody, RequestQuery>, 'user'> & {
user: User;
mailer?: UserManagementMailer.UserManagementMailer;
mailer?: UserManagementMailer;
globalMemberRole?: Role;
};

View file

@ -1,4 +1,5 @@
import type express from 'express';
import { Service } from 'typedi';
import * as Db from '@/Db';
import type { User } from '@/databases/entities/User';
import { jsonParse, LoggerProxy } from 'n8n-workflow';
@ -26,9 +27,8 @@ import type { SamlLoginBinding } from './types';
import type { BindingContext, PostBindingContext } from 'samlify/types/src/entity';
import { validateMetadata, validateResponse } from './samlValidator';
@Service()
export class SamlService {
private static instance: SamlService;
private identityProviderInstance: IdentityProviderInstance | undefined;
private _samlPreferences: SamlPreferences = {
@ -65,13 +65,6 @@ export class SamlService {
};
}
static getInstance(): SamlService {
if (!SamlService.instance) {
SamlService.instance = new SamlService();
}
return SamlService.instance;
}
async init(): Promise<void> {
await this.loadFromDbAndApplySamlPreferences();
setSchemaValidator({

View file

@ -1,10 +1,11 @@
import { Container } from 'typedi';
import config from '@/config';
import * as Db from '@/Db';
import { AuthIdentity } from '../../databases/entities/AuthIdentity';
import { User } from '../../databases/entities/User';
import { getLicense } from '../../License';
import { AuthError } from '../../ResponseHelper';
import { hashPassword, isUserManagementEnabled } from '../../UserManagement/UserManagementHelper';
import { AuthIdentity } from '@db/entities/AuthIdentity';
import { User } from '@db/entities/User';
import { License } from '@/License';
import { AuthError } from '@/ResponseHelper';
import { hashPassword, isUserManagementEnabled } from '@/UserManagement/UserManagementHelper';
import type { SamlPreferences } from './types/samlPreferences';
import type { SamlUserAttributes } from './types/samlUserAttributes';
import type { FlowResult } from 'samlify/types/src/flow';
@ -44,7 +45,7 @@ export function setSamlLoginLabel(label: string): void {
}
export function isSamlLicensed(): boolean {
const license = getLicense();
const license = Container.get(License);
return (
isUserManagementEnabled() &&
(license.isSamlEnabled() || config.getEnv(SAML_ENTERPRISE_FEATURE_ENABLED))

View file

@ -7,7 +7,7 @@ import { LoggerProxy } from 'n8n-workflow';
import config from '@/config';
import type { IExecutionTrackProperties } from '@/Interfaces';
import { getLogger } from '@/Logger';
import { getLicense } from '@/License';
import { License } from '@/License';
import { LicenseService } from '@/license/License.service';
import { N8N_VERSION } from '@/constants';
import { Service } from 'typedi';
@ -39,7 +39,7 @@ export class Telemetry {
private executionCountsBuffer: IExecutionsBuffer = {};
constructor(private postHog: PostHogClient) {}
constructor(private postHog: PostHogClient, private license: License) {}
setInstanceId(instanceId: string) {
this.instanceId = instanceId;
@ -97,8 +97,8 @@ export class Telemetry {
// License info
const pulsePacket = {
plan_name_current: getLicense().getPlanName(),
quota: getLicense().getTriggerLimit(),
plan_name_current: this.license.getPlanName(),
quota: this.license.getTriggerLimit(),
usage: await LicenseService.getActiveTriggerCount(),
};
allPromises.push(this.track('pulse', pulsePacket));

View file

@ -3,6 +3,13 @@ import { CliWorkflowOperationError, SubworkflowOperationError } from 'n8n-workfl
import type { INode } from 'n8n-workflow';
import { START_NODES } from './constants';
/**
* Returns if the given id is a valid workflow id
*/
export function isWorkflowIdValid(id: string | null | undefined): boolean {
return !(typeof id === 'string' && isNaN(parseInt(id, 10)));
}
function findWorkflowStart(executionMode: 'integrated' | 'cli') {
return function (nodes: INode[]) {
const executeWorkflowTriggerNode = nodes.find(

View file

@ -41,7 +41,7 @@ import type { User } from '@db/entities/User';
import { getLogger } from '@/Logger';
import { loadPublicApiVersions } from '@/PublicApi/';
import { issueJWT } from '@/auth/jwt';
import * as UserManagementMailer from '@/UserManagement/email/UserManagementMailer';
import { UserManagementMailer } from '@/UserManagement/email/UserManagementMailer';
import {
AUTHLESS_ENDPOINTS,
COMMUNITY_NODE_VERSION,
@ -177,7 +177,7 @@ export async function initTestServer({
if (functionEndpoints.length) {
const externalHooks = Container.get(ExternalHooks);
const internalHooks = Container.get(InternalHooks);
const mailer = UserManagementMailer.getInstance();
const mailer = Container.get(UserManagementMailer);
const repositories = Db.collections;
for (const group of functionEndpoints) {
@ -226,6 +226,7 @@ export async function initTestServer({
logger,
externalHooks,
internalHooks,
mailer,
repositories,
}),
);

View file

@ -20,7 +20,7 @@ import * as testDb from './shared/testDb';
import type { AuthAgent } from './shared/types';
import * as utils from './shared/utils';
import * as UserManagementMailer from '@/UserManagement/email/UserManagementMailer';
import { UserManagementMailer } from '@/UserManagement/email/UserManagementMailer';
import { NodeMailer } from '@/UserManagement/email/NodeMailer';
jest.mock('@/UserManagement/email/NodeMailer');
@ -512,7 +512,7 @@ test('UserManagementMailer expect NodeMailer.verifyConnection not be called when
const mockVerifyConnection = jest.spyOn(NodeMailer.prototype, 'verifyConnection');
mockVerifyConnection.mockImplementation(async () => {});
const userManagementMailer = UserManagementMailer.getInstance();
const userManagementMailer = new UserManagementMailer();
// NodeMailer.verifyConnection gets called only explicitly
expect(async () => await userManagementMailer.verifyConnection()).rejects.toThrow();
@ -531,7 +531,7 @@ test('UserManagementMailer expect NodeMailer.verifyConnection to be called when
config.set('userManagement.emails.smtp.host', 'host');
config.set('userManagement.emails.mode', 'smtp');
const userManagementMailer = new UserManagementMailer.UserManagementMailer();
const userManagementMailer = new UserManagementMailer();
// NodeMailer.verifyConnection gets called only explicitly
expect(async () => await userManagementMailer.verifyConnection()).not.toThrow();

View file

@ -27,6 +27,8 @@ import { Container } from 'typedi';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { mockInstance } from '../integration/shared/utils';
import { Push } from '@/push';
import { ActiveExecutions } from '@/ActiveExecutions';
import { NodeTypes } from '@/NodeTypes';
/**
* TODO:
@ -157,12 +159,17 @@ describe('ActiveWorkflowRunner', () => {
beforeEach(() => {
externalHooks = mock();
activeWorkflowRunner = new ActiveWorkflowRunner(externalHooks);
activeWorkflowRunner = new ActiveWorkflowRunner(
new ActiveExecutions(),
externalHooks,
Container.get(NodeTypes),
);
});
afterEach(async () => {
await activeWorkflowRunner.removeAll();
databaseActiveWorkflowsCount = 0;
databaseActiveWorkflowsList = [];
jest.clearAllMocks();
});

View file

@ -2,6 +2,7 @@ import { Telemetry } from '@/telemetry';
import config from '@/config';
import { flushPromises } from './Helpers';
import { PostHogClient } from '@/posthog';
import { mock } from 'jest-mock-extended';
jest.unmock('@/telemetry');
jest.mock('@/license/License.service', () => {
@ -45,7 +46,7 @@ describe('Telemetry', () => {
const postHog = new PostHogClient();
postHog.init(instanceId);
telemetry = new Telemetry(postHog);
telemetry = new Telemetry(postHog, mock());
telemetry.setInstanceId(instanceId);
(telemetry as any).rudderStack = {
flush: () => {},